Added start time to Statsz from server.

Added in more debug for imports processing.
Changed subs reporting for Statsz.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-12-19 13:19:00 -08:00
parent da05c45d7e
commit cc5873cd72
5 changed files with 33 additions and 9 deletions

View File

@@ -368,7 +368,7 @@ func (a *Account) autoExpireResponseMaps() []*serviceImport {
// Add a route to connect from an implicit route created for a response to a request.
// This does no checks and should be only called by the msg processing code. Use
// addServiceImport from above if responding to user input or config, etc.
// addServiceImport from above if responding to user input or config changes, etc.
func (a *Account) addImplicitServiceImport(destination *Account, from, to string, autoexpire bool, claim *jwt.Import) error {
a.mu.Lock()
if a.imports.services == nil {
@@ -826,6 +826,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
if a == nil {
return
}
s.Debugf("Updating account claims: %s", a.Name)
a.checkExpiration(ac.Claims())
// Clone to update, only select certain fields.
@@ -848,10 +849,12 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
for _, e := range ac.Exports {
switch e.Type {
case jwt.Stream:
s.Debugf("Adding stream export %q for %s", e.Subject, a.Name)
if err := a.AddStreamExport(string(e.Subject), authAccounts(e.TokenReq)); err != nil {
s.Debugf("Error adding stream export to account [%s]: %v", a.Name, err.Error())
}
case jwt.Service:
s.Debugf("Adding service export %q for %s", e.Subject, a.Name)
if err := a.AddServiceExport(string(e.Subject), authAccounts(e.TokenReq)); err != nil {
s.Debugf("Error adding service export to account [%s]: %v", a.Name, err.Error())
}
@@ -867,9 +870,15 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
}
switch i.Type {
case jwt.Stream:
a.AddStreamImportWithClaim(acc, string(i.Subject), string(i.To), i)
s.Debugf("Adding stream import %s:%q for %s:%q", acc.Name, i.Subject, a.Name, i.To)
if err := a.AddStreamImportWithClaim(acc, string(i.Subject), string(i.To), i); err != nil {
s.Debugf("Error adding stream import to account [%s]: %v", a.Name, err.Error())
}
case jwt.Service:
a.AddServiceImportWithClaim(acc, string(i.Subject), string(i.To), i)
s.Debugf("Adding service import %s:%q for %s:%q", acc.Name, i.Subject, a.Name, i.To)
if err := a.AddServiceImportWithClaim(acc, string(i.Subject), string(i.To), i); err != nil {
s.Debugf("Error adding service import to account [%s]: %v", a.Name, err.Error())
}
}
}
// Now let's apply any needed changes from import/export changes.

View File

@@ -126,6 +126,7 @@ type ClientInfo struct {
// Various statistics we will periodically send out.
type ServerStats struct {
Start time.Time `json:"start"`
Mem int64 `json:"mem"`
Cores int `json:"cores"`
CPU float64 `json:"cpu"`
@@ -339,6 +340,7 @@ func routeStat(r *client) *RouteStat {
func (s *Server) sendStatsz(subj string) {
m := ServerStatsMsg{}
updateServerUsage(&m.Stats)
m.Stats.Start = s.start
m.Stats.Connections = len(s.clients)
m.Stats.TotalConnections = s.totalClients
m.Stats.ActiveAccounts = s.activeAccounts
@@ -347,7 +349,8 @@ func (s *Server) sendStatsz(subj string) {
m.Stats.Sent.Msgs = atomic.LoadInt64(&s.outMsgs)
m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes)
m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
m.Stats.NumSubs = s.gacc.sl.Count()
m.Stats.NumSubs = s.numSubscriptions()
for _, r := range s.routes {
m.Stats.Routes = append(m.Stats.Routes, routeStat(r))
}

View File

@@ -1033,9 +1033,11 @@ func TestSystemAccountWithGateways(t *testing.T) {
}
}
func TestServerEventsStatsZ(t *testing.T) {
preStart := time.Now()
sa, optsA, sb, _, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
postStart := time.Now()
url := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
ncs, err := nats.Connect(url, createUserCreds(t, sa, akp))
@@ -1078,7 +1080,9 @@ func TestServerEventsStatsZ(t *testing.T) {
if m.Server.Version != VERSION {
t.Fatalf("Did not match server version")
}
if !m.Stats.Start.After(preStart) && m.Stats.Start.Before(postStart) {
t.Fatalf("Got a wrong start time for the server %v", m.Stats.Start)
}
if m.Stats.Connections != 1 {
t.Fatalf("Did not match connections of 1, got %d", m.Stats.Connections)
}

View File

@@ -1375,7 +1375,7 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
// e.g.
// {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"}
// {stream: {account: "synadia", subject:"synadia.private.*"}}
// {service: {account: "synadia", subject: "pub.special.request"}, subject: "synadia.request"}
// {service: {account: "synadia", subject: "pub.special.request"}, to: "synadia.request"}
func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*importStream, *importService, error) {
var (
curStream *importStream

View File

@@ -780,11 +780,13 @@ func (s *Server) fetchRawAccountClaims(name string) (string, error) {
claimJWT, err := accResolver.Fetch(name)
fetchTime := time.Since(start)
s.mu.Lock()
s.Debugf("Account resolver fetch time was %v\n", fetchTime)
if fetchTime > time.Second {
s.Warnf("Account resolver took %v to fetch account", fetchTime)
s.Warnf("Account Fetch: %s in %v\n", name, fetchTime)
} else {
s.Debugf("Account Fetch: %s in %v\n", name, fetchTime)
}
if err != nil {
s.Warnf("Account Fetch Failed: %v\n", err)
return "", err
}
return claimJWT, nil
@@ -1684,13 +1686,19 @@ func (s *Server) getClient(cid uint64) *client {
// NumSubscriptions will report how many subscriptions are active.
func (s *Server) NumSubscriptions() uint32 {
s.mu.Lock()
defer s.mu.Unlock()
return s.numSubscriptions()
}
// numSubscriptions will report how many subscriptions are active.
// Lock should be held.
func (s *Server) numSubscriptions() uint32 {
var subs int
for _, acc := range s.accounts {
if acc.sl != nil {
subs += acc.TotalSubs()
}
}
s.mu.Unlock()
return uint32(subs)
}