mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Merge branch 'master' into active_servers
This commit is contained in:
@@ -2822,6 +2822,16 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
s.updateAccountClaimsWithRefresh(a, ac, true)
|
||||
}
|
||||
|
||||
func (a *Account) traceLabel() string {
|
||||
if a == nil {
|
||||
return _EMPTY_
|
||||
}
|
||||
if a.nameTag != _EMPTY_ {
|
||||
return fmt.Sprintf("%s/%s", a.Name, a.nameTag)
|
||||
}
|
||||
return a.Name
|
||||
}
|
||||
|
||||
// updateAccountClaimsWithRefresh will update an existing account with new claims.
|
||||
// If refreshImportingAccounts is true it will also update incomplete dependent accounts
|
||||
// This will replace any exports or imports previously defined.
|
||||
@@ -2927,7 +2937,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
jsEnabled := s.JetStreamEnabled()
|
||||
if jsEnabled && a == s.SystemAccount() {
|
||||
for _, export := range allJsExports {
|
||||
s.Debugf("Adding jetstream service export %q for %s", export, a.Name)
|
||||
s.Debugf("Adding jetstream service export %q for %s", export, a.traceLabel())
|
||||
if err := a.AddServiceExport(export, nil); err != nil {
|
||||
s.Errorf("Error setting up jetstream service exports: %v", err)
|
||||
}
|
||||
@@ -2938,13 +2948,13 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
for _, e := range ac.Exports {
|
||||
switch e.Type {
|
||||
case jwt.Stream:
|
||||
s.Debugf("Adding stream export %q for %s", e.Subject, a.Name)
|
||||
s.Debugf("Adding stream export %q for %s", e.Subject, a.traceLabel())
|
||||
if err := a.addStreamExportWithAccountPos(
|
||||
string(e.Subject), authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil {
|
||||
s.Debugf("Error adding stream export to account [%s]: %v", a.Name, err.Error())
|
||||
s.Debugf("Error adding stream export to account [%s]: %v", a.traceLabel(), err.Error())
|
||||
}
|
||||
case jwt.Service:
|
||||
s.Debugf("Adding service export %q for %s", e.Subject, a.Name)
|
||||
s.Debugf("Adding service export %q for %s", e.Subject, a.traceLabel())
|
||||
rt := Singleton
|
||||
switch e.ResponseType {
|
||||
case jwt.ResponseTypeStream:
|
||||
@@ -2954,7 +2964,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
}
|
||||
if err := a.addServiceExportWithResponseAndAccountPos(
|
||||
string(e.Subject), rt, authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil {
|
||||
s.Debugf("Error adding service export to account [%s]: %v", a.Name, err)
|
||||
s.Debugf("Error adding service export to account [%s]: %v", a.traceLabel(), err)
|
||||
continue
|
||||
}
|
||||
sub := string(e.Subject)
|
||||
@@ -2964,13 +2974,13 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
if e.Latency.Sampling == jwt.Headers {
|
||||
hdrNote = " (using headers)"
|
||||
}
|
||||
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.Name, err)
|
||||
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.traceLabel(), err)
|
||||
}
|
||||
}
|
||||
if e.ResponseThreshold != 0 {
|
||||
// Response threshold was set in options.
|
||||
if err := a.SetServiceExportResponseThreshold(sub, e.ResponseThreshold); err != nil {
|
||||
s.Debugf("Error adding service export response threshold for [%s]: %v", a.Name, err)
|
||||
s.Debugf("Error adding service export response threshold for [%s]: %v", a.traceLabel(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3008,14 +3018,14 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
if i.LocalSubject != _EMPTY_ {
|
||||
// set local subject implies to is empty
|
||||
to = string(i.LocalSubject)
|
||||
s.Debugf("Adding stream import %s:%q for %s:%q", acc.Name, from, a.Name, to)
|
||||
s.Debugf("Adding stream import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
|
||||
err = a.AddMappedStreamImportWithClaim(acc, from, to, i)
|
||||
} else {
|
||||
s.Debugf("Adding stream import %s:%q for %s:%q", acc.Name, from, a.Name, to)
|
||||
s.Debugf("Adding stream import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
|
||||
err = a.AddStreamImportWithClaim(acc, from, to, i)
|
||||
}
|
||||
if err != nil {
|
||||
s.Debugf("Error adding stream import to account [%s]: %v", a.Name, err.Error())
|
||||
s.Debugf("Error adding stream import to account [%s]: %v", a.traceLabel(), err.Error())
|
||||
incompleteImports = append(incompleteImports, i)
|
||||
}
|
||||
case jwt.Service:
|
||||
@@ -3023,9 +3033,9 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
from = string(i.LocalSubject)
|
||||
to = string(i.Subject)
|
||||
}
|
||||
s.Debugf("Adding service import %s:%q for %s:%q", acc.Name, from, a.Name, to)
|
||||
s.Debugf("Adding service import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
|
||||
if err := a.AddServiceImportWithClaim(acc, from, to, i); err != nil {
|
||||
s.Debugf("Error adding service import to account [%s]: %v", a.Name, err.Error())
|
||||
s.Debugf("Error adding service import to account [%s]: %v", a.traceLabel(), err.Error())
|
||||
incompleteImports = append(incompleteImports, i)
|
||||
}
|
||||
}
|
||||
@@ -3168,7 +3178,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
// regardless of enabled or disabled. It handles both cases.
|
||||
if jsEnabled {
|
||||
if err := s.configJetStream(a); err != nil {
|
||||
s.Errorf("Error configuring jetstream for account [%s]: %v", a.Name, err.Error())
|
||||
s.Errorf("Error configuring jetstream for account [%s]: %v", a.traceLabel(), err.Error())
|
||||
a.mu.Lock()
|
||||
// Absent reload of js server cfg, this is going to be broken until js is disabled
|
||||
a.incomplete = true
|
||||
@@ -3235,6 +3245,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
acc.mu.RLock()
|
||||
incomplete := acc.incomplete
|
||||
name := acc.Name
|
||||
label := acc.traceLabel()
|
||||
// Must use jwt in account or risk failing on fetch
|
||||
// This jwt may not be the same that caused exportingAcc to be in incompleteAccExporterMap
|
||||
claimJWT := acc.claimJWT
|
||||
@@ -3250,7 +3261,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
// Since this account just got updated, the import itself may be in error. So trace that.
|
||||
if _, ok := s.incompleteAccExporterMap.Load(old.Name); ok {
|
||||
s.incompleteAccExporterMap.Delete(old.Name)
|
||||
s.Errorf("Account %s has issues importing account %s", name, old.Name)
|
||||
s.Errorf("Account %s has issues importing account %s", label, old.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1492,7 +1492,9 @@ func (c *client) markConnAsClosed(reason ClosedState) {
|
||||
// Be consistent with the creation: for routes, gateways and leaf,
|
||||
// we use Noticef on create, so use that too for delete.
|
||||
if c.srv != nil {
|
||||
if c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF {
|
||||
if c.kind == LEAF {
|
||||
c.Noticef("%s connection closed: %s account: %s", c.typeString(), reason, c.acc.traceLabel())
|
||||
} else if c.kind == ROUTER || c.kind == GATEWAY {
|
||||
c.Noticef("%s connection closed: %s", c.typeString(), reason)
|
||||
} else { // Client, System, Jetstream, and Account connections.
|
||||
c.Debugf("%s connection closed: %s", c.typeString(), reason)
|
||||
|
||||
@@ -200,6 +200,7 @@ type ServerStats struct {
|
||||
Routes []*RouteStat `json:"routes,omitempty"`
|
||||
Gateways []*GatewayStat `json:"gateways,omitempty"`
|
||||
ActiveServers int `json:"active_servers,omitempty"`
|
||||
JetStream *JetStreamVarz `json:"jetstream,omitempty"`
|
||||
}
|
||||
|
||||
// RouteStat holds route statistics.
|
||||
@@ -584,6 +585,30 @@ func (s *Server) sendStatsz(subj string) {
|
||||
if s.sys != nil {
|
||||
m.Stats.ActiveServers += len(s.sys.servers)
|
||||
}
|
||||
// JetStream
|
||||
if js := s.js; js != nil {
|
||||
jStat := &JetStreamVarz{}
|
||||
s.mu.Unlock()
|
||||
js.mu.RLock()
|
||||
c := js.config
|
||||
c.StoreDir = _EMPTY_
|
||||
jStat.Config = &c
|
||||
js.mu.RUnlock()
|
||||
jStat.Stats = js.usageStats()
|
||||
if mg := js.getMetaGroup(); mg != nil {
|
||||
if mg.Leader() {
|
||||
jStat.Meta = s.raftNodeToClusterInfo(mg)
|
||||
} else {
|
||||
// non leader only include a shortened version without peers
|
||||
jStat.Meta = &ClusterInfo{
|
||||
Name: s.ClusterName(),
|
||||
Leader: s.serverNameForNode(mg.GroupLeader()),
|
||||
}
|
||||
}
|
||||
}
|
||||
m.Stats.JetStream = jStat
|
||||
s.mu.Lock()
|
||||
}
|
||||
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
|
||||
}
|
||||
|
||||
|
||||
@@ -81,6 +81,7 @@ type jetStream struct {
|
||||
memReserved int64
|
||||
storeReserved int64
|
||||
apiCalls int64
|
||||
apiErrors int64
|
||||
memTotal int64
|
||||
storeTotal int64
|
||||
mu sync.RWMutex
|
||||
@@ -1544,27 +1545,13 @@ func (js *jetStream) dynamicAccountLimits() *JetStreamAccountLimits {
|
||||
// Report on JetStream stats and usage for this server.
|
||||
func (js *jetStream) usageStats() *JetStreamStats {
|
||||
var stats JetStreamStats
|
||||
|
||||
var _jsa [512]*jsAccount
|
||||
accounts := _jsa[:0]
|
||||
|
||||
js.mu.RLock()
|
||||
for _, jsa := range js.accounts {
|
||||
accounts = append(accounts, jsa)
|
||||
}
|
||||
stats.Accounts = len(js.accounts)
|
||||
js.mu.RUnlock()
|
||||
|
||||
stats.Accounts = len(accounts)
|
||||
|
||||
// Collect account information.
|
||||
for _, jsa := range accounts {
|
||||
jsa.mu.RLock()
|
||||
stats.Memory += uint64(jsa.usage.mem)
|
||||
stats.Store += uint64(jsa.usage.store)
|
||||
stats.API.Total += jsa.usage.api
|
||||
stats.API.Errors += jsa.usage.err
|
||||
jsa.mu.RUnlock()
|
||||
}
|
||||
stats.API.Total = (uint64)(atomic.LoadInt64(&js.apiCalls))
|
||||
stats.API.Errors = (uint64)(atomic.LoadInt64(&js.apiErrors))
|
||||
stats.Memory = (uint64)(atomic.LoadInt64(&js.memTotal))
|
||||
stats.Store = (uint64)(atomic.LoadInt64(&js.storeTotal))
|
||||
return &stats
|
||||
}
|
||||
|
||||
|
||||
@@ -818,7 +818,9 @@ func (a *Account) trackAPIErr() {
|
||||
jsa.usage.err++
|
||||
jsa.apiErrors++
|
||||
jsa.sendClusterUsageUpdate()
|
||||
js := jsa.js
|
||||
jsa.mu.Unlock()
|
||||
atomic.AddInt64(&js.apiErrors, 1)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -795,7 +795,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
|
||||
// Determines if we are soliciting the connection or not.
|
||||
var solicited bool
|
||||
var acc *Account
|
||||
|
||||
var remoteSuffix string
|
||||
if remote != nil {
|
||||
// TODO: Decide what should be the optimal behavior here.
|
||||
// For now, if lookup fails, we will constantly try
|
||||
@@ -814,13 +814,15 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
|
||||
acc, err = s.LookupAccount(lacc)
|
||||
if err != nil {
|
||||
s.Errorf("No local account %q for leafnode: %v", lacc, err)
|
||||
c.closeConnection(MissingAccount)
|
||||
return nil
|
||||
}
|
||||
remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.initClient()
|
||||
c.Noticef("Leafnode connection created")
|
||||
c.Noticef("Leafnode connection created%s", remoteSuffix)
|
||||
|
||||
if remote != nil {
|
||||
solicited = true
|
||||
|
||||
@@ -1054,6 +1054,7 @@ type Varz struct {
|
||||
type JetStreamVarz struct {
|
||||
Config *JetStreamConfig `json:"config,omitempty"`
|
||||
Stats *JetStreamStats `json:"stats,omitempty"`
|
||||
Meta *ClusterInfo `json:"meta,omitempty"`
|
||||
}
|
||||
|
||||
// ClusterOptsVarz contains monitoring cluster information
|
||||
@@ -1431,6 +1432,9 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64
|
||||
// FIXME(dlc) - We have lock inversion that needs to be fixed up properly.
|
||||
s.mu.Unlock()
|
||||
v.JetStream.Stats = s.js.usageStats()
|
||||
if mg := s.js.getMetaGroup(); mg != nil {
|
||||
v.JetStream.Meta = s.raftNodeToClusterInfo(mg)
|
||||
}
|
||||
s.mu.Lock()
|
||||
}
|
||||
}
|
||||
@@ -2371,6 +2375,24 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) {
|
||||
return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config), nil
|
||||
}
|
||||
|
||||
// helper to get cluster info from node via dummy group
|
||||
func (s *Server) raftNodeToClusterInfo(node RaftNode) *ClusterInfo {
|
||||
if node == nil {
|
||||
return nil
|
||||
}
|
||||
peers := node.Peers()
|
||||
peerList := make([]string, len(peers))
|
||||
for i, p := range node.Peers() {
|
||||
peerList[i] = p.ID
|
||||
}
|
||||
group := &raftGroup{
|
||||
Name: _EMPTY_,
|
||||
Peers: peerList,
|
||||
node: node,
|
||||
}
|
||||
return s.js.clusterInfo(group)
|
||||
}
|
||||
|
||||
// Jsz returns a Jsz structure containing information about JetStream.
|
||||
func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
|
||||
// set option defaults
|
||||
@@ -2405,23 +2427,6 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// helper to get cluster info from node via dummy group
|
||||
toClusterInfo := func(node RaftNode) *ClusterInfo {
|
||||
if node == nil {
|
||||
return nil
|
||||
}
|
||||
peers := node.Peers()
|
||||
peerList := make([]string, len(peers))
|
||||
for i, p := range node.Peers() {
|
||||
peerList[i] = p.ID
|
||||
}
|
||||
group := &raftGroup{
|
||||
Name: "",
|
||||
Peers: peerList,
|
||||
node: node,
|
||||
}
|
||||
return s.js.clusterInfo(group)
|
||||
}
|
||||
jsi := &JSInfo{
|
||||
ID: s.ID(),
|
||||
Now: time.Now().UTC(),
|
||||
@@ -2437,10 +2442,12 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
|
||||
for _, info := range s.js.accounts {
|
||||
accounts = append(accounts, info)
|
||||
}
|
||||
jsi.APICalls = atomic.LoadInt64(&s.js.apiCalls)
|
||||
s.js.mu.RUnlock()
|
||||
jsi.APICalls = atomic.LoadInt64(&s.js.apiCalls)
|
||||
|
||||
jsi.Meta = s.raftNodeToClusterInfo(s.js.getMetaGroup())
|
||||
jsi.JetStreamStats = *s.js.usageStats()
|
||||
|
||||
jsi.Meta = toClusterInfo(s.js.getMetaGroup())
|
||||
filterIdx := -1
|
||||
for i, jsa := range accounts {
|
||||
if jsa.acc().GetName() == opts.Account {
|
||||
@@ -2448,10 +2455,6 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
|
||||
}
|
||||
jsa.mu.RLock()
|
||||
jsi.Streams += len(jsa.streams)
|
||||
jsi.Memory += uint64(jsa.usage.mem)
|
||||
jsi.Store += uint64(jsa.usage.store)
|
||||
jsi.API.Total += jsa.usage.api
|
||||
jsi.API.Errors += jsa.usage.err
|
||||
for _, stream := range jsa.streams {
|
||||
streamState := stream.state()
|
||||
jsi.Messages += streamState.Msgs
|
||||
|
||||
Reference in New Issue
Block a user