diff --git a/server/accounts.go b/server/accounts.go index b9fa7d95..95645892 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -368,7 +368,7 @@ func (a *Account) autoExpireResponseMaps() []*serviceImport { // Add a route to connect from an implicit route created for a response to a request. // This does no checks and should be only called by the msg processing code. Use -// addServiceImport from above if responding to user input or config, etc. +// addServiceImport from above if responding to user input or config changes, etc. func (a *Account) addImplicitServiceImport(destination *Account, from, to string, autoexpire bool, claim *jwt.Import) error { a.mu.Lock() if a.imports.services == nil { @@ -826,6 +826,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { if a == nil { return } + s.Debugf("Updating account claims: %s", a.Name) a.checkExpiration(ac.Claims()) // Clone to update, only select certain fields. @@ -848,10 +849,12 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { for _, e := range ac.Exports { switch e.Type { case jwt.Stream: + s.Debugf("Adding stream export %q for %s", e.Subject, a.Name) if err := a.AddStreamExport(string(e.Subject), authAccounts(e.TokenReq)); err != nil { s.Debugf("Error adding stream export to account [%s]: %v", a.Name, err.Error()) } case jwt.Service: + s.Debugf("Adding service export %q for %s", e.Subject, a.Name) if err := a.AddServiceExport(string(e.Subject), authAccounts(e.TokenReq)); err != nil { s.Debugf("Error adding service export to account [%s]: %v", a.Name, err.Error()) } @@ -867,9 +870,15 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { } switch i.Type { case jwt.Stream: - a.AddStreamImportWithClaim(acc, string(i.Subject), string(i.To), i) + s.Debugf("Adding stream import %s:%q for %s:%q", acc.Name, i.Subject, a.Name, i.To) + if err := a.AddStreamImportWithClaim(acc, string(i.Subject), string(i.To), i); err != nil { + s.Debugf("Error adding stream import to account [%s]: %v", a.Name, err.Error()) + } case jwt.Service: - a.AddServiceImportWithClaim(acc, string(i.Subject), string(i.To), i) + s.Debugf("Adding service import %s:%q for %s:%q", acc.Name, i.Subject, a.Name, i.To) + if err := a.AddServiceImportWithClaim(acc, string(i.Subject), string(i.To), i); err != nil { + s.Debugf("Error adding service import to account [%s]: %v", a.Name, err.Error()) + } } } // Now let's apply any needed changes from import/export changes. diff --git a/server/events.go b/server/events.go index cd469e56..60ad0f4c 100644 --- a/server/events.go +++ b/server/events.go @@ -126,6 +126,7 @@ type ClientInfo struct { // Various statistics we will periodically send out. type ServerStats struct { + Start time.Time `json:"start"` Mem int64 `json:"mem"` Cores int `json:"cores"` CPU float64 `json:"cpu"` @@ -339,6 +340,7 @@ func routeStat(r *client) *RouteStat { func (s *Server) sendStatsz(subj string) { m := ServerStatsMsg{} updateServerUsage(&m.Stats) + m.Stats.Start = s.start m.Stats.Connections = len(s.clients) m.Stats.TotalConnections = s.totalClients m.Stats.ActiveAccounts = s.activeAccounts @@ -347,7 +349,8 @@ func (s *Server) sendStatsz(subj string) { m.Stats.Sent.Msgs = atomic.LoadInt64(&s.outMsgs) m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes) m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) - m.Stats.NumSubs = s.gacc.sl.Count() + m.Stats.NumSubs = s.numSubscriptions() + for _, r := range s.routes { m.Stats.Routes = append(m.Stats.Routes, routeStat(r)) } diff --git a/server/events_test.go b/server/events_test.go index 31a216fe..33a532d2 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1033,9 +1033,11 @@ func TestSystemAccountWithGateways(t *testing.T) { } } func TestServerEventsStatsZ(t *testing.T) { + preStart := time.Now() sa, optsA, sb, _, akp := runTrustedCluster(t) defer sa.Shutdown() defer sb.Shutdown() + postStart := time.Now() url := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port) ncs, err := nats.Connect(url, createUserCreds(t, sa, akp)) @@ -1078,7 +1080,9 @@ func TestServerEventsStatsZ(t *testing.T) { if m.Server.Version != VERSION { t.Fatalf("Did not match server version") } - + if !m.Stats.Start.After(preStart) && m.Stats.Start.Before(postStart) { + t.Fatalf("Got a wrong start time for the server %v", m.Stats.Start) + } if m.Stats.Connections != 1 { t.Fatalf("Did not match connections of 1, got %d", m.Stats.Connections) } diff --git a/server/opts.go b/server/opts.go index a5424075..66bdc6db 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1375,7 +1375,7 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo // e.g. // {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"} // {stream: {account: "synadia", subject:"synadia.private.*"}} -// {service: {account: "synadia", subject: "pub.special.request"}, subject: "synadia.request"} +// {service: {account: "synadia", subject: "pub.special.request"}, to: "synadia.request"} func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*importStream, *importService, error) { var ( curStream *importStream diff --git a/server/server.go b/server/server.go index 36f191c6..3c00fd49 100644 --- a/server/server.go +++ b/server/server.go @@ -780,11 +780,13 @@ func (s *Server) fetchRawAccountClaims(name string) (string, error) { claimJWT, err := accResolver.Fetch(name) fetchTime := time.Since(start) s.mu.Lock() - s.Debugf("Account resolver fetch time was %v\n", fetchTime) if fetchTime > time.Second { - s.Warnf("Account resolver took %v to fetch account", fetchTime) + s.Warnf("Account Fetch: %s in %v\n", name, fetchTime) + } else { + s.Debugf("Account Fetch: %s in %v\n", name, fetchTime) } if err != nil { + s.Warnf("Account Fetch Failed: %v\n", err) return "", err } return claimJWT, nil @@ -1684,13 +1686,19 @@ func (s *Server) getClient(cid uint64) *client { // NumSubscriptions will report how many subscriptions are active. func (s *Server) NumSubscriptions() uint32 { s.mu.Lock() + defer s.mu.Unlock() + return s.numSubscriptions() +} + +// numSubscriptions will report how many subscriptions are active. +// Lock should be held. +func (s *Server) numSubscriptions() uint32 { var subs int for _, acc := range s.accounts { if acc.sl != nil { subs += acc.TotalSubs() } } - s.mu.Unlock() return uint32(subs) }