Merge pull request #831 from nats-io/statsz

Added statsz support
This commit is contained in:
Derek Collison
2018-12-04 09:14:16 -08:00
committed by GitHub
3 changed files with 324 additions and 42 deletions

View File

@@ -21,16 +21,21 @@ import (
"sync"
"sync/atomic"
"time"
"github.com/nats-io/gnatsd/server/pse"
)
const (
connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT"
disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT"
accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS"
accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS"
accUpdateEventSubj = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE"
connsRespSubj = "$SYS._INBOX_.%s"
accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS"
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
serverStatsReqSubj = "$SYS.REQ.SERVER.%s.STATSZ"
shutdownEventTokens = 4
serverSubjectIndex = 2
accUpdateTokens = 5
@@ -45,11 +50,19 @@ type internal struct {
sid uint64
servers map[string]*serverUpdate
sweeper *time.Timer
stmr *time.Timer
subs map[string]msgHandler
sendq chan *pubMsg
wg sync.WaitGroup
orphMax time.Duration
chkOrph time.Duration
statsz time.Duration
}
// ServerStatsMsg is sent periodically with stats updates.
type ServerStatsMsg struct {
Server ServerInfo `json:"server"`
Stats ServerStats `json:"statsz"`
}
// ConnectEventMsg is sent when a new connection is made that is part of an account.
@@ -103,6 +116,37 @@ type ClientInfo struct {
Stop *time.Time `json:"stop,omitempty"`
}
// Various statistics we will periodically send out.
type ServerStats struct {
Mem int64 `json:"mem"`
Cores int `json:"cores"`
CPU float64 `json:"cpu"`
Connections int `json:"connections"`
TotalConnections uint64 `json:"total_connections"`
ActiveAccounts int `json:"active_accounts"`
NumSubs uint32 `json:"subscriptions"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
SlowConsumers int64 `json:"slow_consumers"`
Routes []*RouteStat `json:"routes,omitempty"`
Gateways []*GatewayStat `json:"gateways,omitempty"`
}
type RouteStat struct {
ID uint64 `json:"rid"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
Pending int `json:"pending"`
}
type GatewayStat struct {
ID uint64 `json:"gwid"`
Name string `json:"name"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
NumInbound int `json:"inbound_connections"`
}
// DataStats reports how may msg and bytes. Applicable for both sent and received.
type DataStats struct {
Msgs int64 `json:"msgs"`
@@ -217,12 +261,8 @@ func (s *Server) eventsEnabled() bool {
}
// Check for orphan servers who may have gone away without notification.
// This should be wrapChk() to setup common locking.
func (s *Server) checkRemoteServers() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() {
return
}
now := time.Now()
for sid, su := range s.sys.servers {
if now.Sub(su.ltime) > s.sys.orphMax {
@@ -237,14 +277,103 @@ func (s *Server) checkRemoteServers() {
}
}
// Start a ticker that will fire periodically and check for orphaned servers.
func (s *Server) startRemoteServerSweepTimer() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() {
return
// Grab RSS and PCPU
func updateServerUsage(v *ServerStats) {
var rss, vss int64
var pcpu float64
pse.ProcUsage(&pcpu, &rss, &vss)
v.Mem = rss
v.CPU = pcpu
v.Cores = numCores
}
// Generate a route stat for our statz update.
func routeStat(r *client) *RouteStat {
if r == nil {
return nil
}
s.sys.sweeper = time.AfterFunc(s.sys.chkOrph, s.checkRemoteServers)
r.mu.Lock()
rs := &RouteStat{
ID: r.cid,
Sent: DataStats{
Msgs: atomic.LoadInt64(&r.outMsgs),
Bytes: atomic.LoadInt64(&r.outBytes),
},
Received: DataStats{
Msgs: atomic.LoadInt64(&r.inMsgs),
Bytes: atomic.LoadInt64(&r.inBytes),
},
Pending: int(r.out.pb),
}
r.mu.Unlock()
return rs
}
// Actual send method for statz updates.
// Lock should be held.
func (s *Server) sendStatsz(subj string) {
m := ServerStatsMsg{}
updateServerUsage(&m.Stats)
m.Stats.Connections = len(s.clients)
m.Stats.TotalConnections = s.totalClients
m.Stats.ActiveAccounts = s.activeAccounts
m.Stats.Received.Msgs = atomic.LoadInt64(&s.inMsgs)
m.Stats.Received.Bytes = atomic.LoadInt64(&s.inBytes)
m.Stats.Sent.Msgs = atomic.LoadInt64(&s.outMsgs)
m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes)
m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
m.Stats.NumSubs = s.gacc.sl.Count()
for _, r := range s.routes {
m.Stats.Routes = append(m.Stats.Routes, routeStat(r))
}
if s.gateway.enabled {
gw := s.gateway
gw.RLock()
for name, c := range gw.out {
gs := &GatewayStat{Name: name}
c.mu.Lock()
gs.ID = c.cid
gs.Sent = DataStats{
Msgs: atomic.LoadInt64(&c.outMsgs),
Bytes: atomic.LoadInt64(&c.outBytes),
}
c.mu.Unlock()
// Gather matching inbound connections
gs.Received = DataStats{}
for _, c := range gw.in {
c.mu.Lock()
if c.gw.name == name {
gs.Received.Msgs += atomic.LoadInt64(&c.inMsgs)
gs.Received.Bytes += atomic.LoadInt64(&c.inBytes)
gs.NumInbound++
}
c.mu.Unlock()
}
m.Stats.Gateways = append(m.Stats.Gateways, gs)
}
gw.RUnlock()
}
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
}
// Send out our statz update.
// This should be wrapChk() to setup common locking.
func (s *Server) heartbeatStatsz() {
if s.sys.stmr != nil {
s.sys.stmr.Reset(s.sys.statsz)
}
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
}
// This should be wrapChk() to setup common locking.
func (s *Server) startStatszTimer() {
s.sys.stmr = time.AfterFunc(s.sys.statsz, s.wrapChk(s.heartbeatStatsz))
}
// Start a ticker that will fire periodically and check for orphaned servers.
// This should be wrapChk() to setup common locking.
func (s *Server) startRemoteServerSweepTimer() {
s.sys.sweeper = time.AfterFunc(s.sys.chkOrph, s.wrapChk(s.checkRemoteServers))
}
// This will setup our system wide tracking subs.
@@ -282,6 +411,11 @@ func (s *Server) initEventTracking() {
s.Errorf("Error setting up internal tracking: %v", err)
}
// Listen for requests for our statz
subject = fmt.Sprintf(serverStatsReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.statszReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
}
// accountClaimUpdate will receive claim updates for accounts.
@@ -313,7 +447,7 @@ func (s *Server) processRemoteServerShutdown(sid string) {
}
}
// serverShutdownEvent is called when we get an event from another server shutting down.
// remoteServerShutdownEvent is called when we get an event from another server shutting down.
func (s *Server) remoteServerShutdown(sub *subscription, subject, reply string, msg []byte) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -360,10 +494,8 @@ func (s *Server) shutdownEventing() {
}
s.mu.Lock()
if s.sys.sweeper != nil {
s.sys.sweeper.Stop()
s.sys.sweeper = nil
}
clearTimer(&s.sys.sweeper)
clearTimer(&s.sys.stmr)
s.mu.Unlock()
// We will queue up a shutdown event and wait for the
@@ -395,14 +527,13 @@ func (s *Server) shutdownEventing() {
s.sys = nil
}
// Request for our local connection count.
func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() {
return
}
m := accNumConnsReq{}
if err := json.Unmarshal(msg, &m); err != nil {
s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err)
@@ -417,15 +548,23 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by
}
}
// statszReq is a request for us to respond with current statz.
func (s *Server) statszReq(sub *subscription, subject, reply string, msg []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() || reply == _EMPTY_ {
return
}
s.sendStatsz(reply)
}
// remoteConnsUpdate gets called when we receive a remote update from another server.
func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() {
return
}
m := accNumConns{}
if err := json.Unmarshal(msg, &m); err != nil {
s.sys.client.Errorf("Error unmarshalling account connection event message: %v", err)
@@ -475,7 +614,7 @@ func (s *Server) enableAccountTracking(a *Account) {
}
// FIXME(dlc) - make configurable.
const AccountConnHBInterval = 30 * time.Second
const eventsHBInterval = 30 * time.Second
// sendAccConnsUpdate is called to send out our information on the
// account's local connections.
@@ -484,8 +623,15 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
if !s.eventsEnabled() || a == nil || a == s.sys.account || a == s.gacc {
return
}
// Update timer first
a.mu.Lock()
// Check to see if we have an HB running and update.
if a.ctmr == nil {
a.etmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) })
} else {
a.etmr.Reset(eventsHBInterval)
}
// If no limits set, don't update, no need to.
if a.mconns == 0 {
a.mu.Unlock()
@@ -496,12 +642,6 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
Account: a.Name,
Conns: len(a.clients),
}
// Check to see if we have an HB running and update.
if a.ctmr == nil {
a.etmr = time.AfterFunc(AccountConnHBInterval, func() { s.accConnsUpdate(a) })
} else {
a.etmr.Reset(AccountConnHBInterval)
}
a.mu.Unlock()
s.sendInternalMsg(subj, "", &m.Server, &m)
@@ -647,6 +787,8 @@ func (s *Server) sysUnsubscribe(sub *subscription) {
c.unsubscribe(acc, sub, true)
}
// flushClients will make sure toi flush any clients we may have
// sent to during sendInternalMsg.
func (c *client) flushClients() {
last := time.Now()
for cp := range c.pcd {
@@ -661,9 +803,31 @@ func (c *client) flushClients() {
}
}
// Helper to grab name for a client.
func nameForClient(c *client) string {
if c.user != nil {
return c.user.Nkey
}
return "N/A"
}
// Helper to clear timers.
func clearTimer(tp **time.Timer) {
if t := *tp; t != nil {
t.Stop()
*tp = nil
}
}
// Helper function to wrap functions with common test
// to lock server and return if events not enabled.
func (s *Server) wrapChk(f func()) func() {
return func() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() {
return
}
f()
}
}

View File

@@ -69,7 +69,7 @@ func runTrustedServer(t *testing.T) (*Server, *Options) {
return s, opts
}
func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options) {
func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
t.Helper()
kp, _ := nkeys.FromSeed(oSeed)
@@ -102,7 +102,7 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options) {
checkClusterFormed(t, sa, sb)
return sa, optsA, sb, optsB
return sa, optsA, sb, optsB, akp
}
func runTrustedGateways(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) {
@@ -382,7 +382,7 @@ func TestSystemAccountInternalSubscriptions(t *testing.T) {
}
func TestSystemAccountConnectionLimits(t *testing.T) {
sa, optsA, sb, optsB := runTrustedCluster(t)
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
@@ -427,7 +427,7 @@ func TestSystemAccountConnectionLimits(t *testing.T) {
// Test that the remote accounting works when a server is started some time later.
func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) {
sa, optsA, sb, optsB := runTrustedCluster(t)
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
sb.Shutdown()
@@ -459,8 +459,8 @@ func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) {
checkClusterFormed(t, sa, sb)
// Trigger a load of the user account on the new server
// NOTE: If we do not load the user can be the first to request this
// account, hence the connection will succeed.
// NOTE: If we do not load the user, the user can be the first
// to request this account, hence the connection will succeed.
sb.LookupAccount(pub)
// Expect this to fail.
@@ -472,7 +472,7 @@ func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) {
// Test that the remote accounting works when a server is shutdown.
func TestSystemAccountConnectionLimitsServerShutdownGraceful(t *testing.T) {
sa, optsA, sb, optsB := runTrustedCluster(t)
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
@@ -523,7 +523,7 @@ func TestSystemAccountConnectionLimitsServerShutdownGraceful(t *testing.T) {
// Test that the remote accounting works when a server goes away.
func TestSystemAccountConnectionLimitsServerShutdownForced(t *testing.T) {
sa, optsA, sb, optsB := runTrustedCluster(t)
sa, optsA, sb, optsB, _ := runTrustedCluster(t)
defer sa.Shutdown()
// Let's create a user account.
@@ -791,7 +791,9 @@ func TestSystemAccountWithGateways(t *testing.T) {
sub, _ := nca.SubscribeSync("$SYS.ACCOUNT.>")
defer sub.Unsubscribe()
nca.Flush()
checkExpectedSubs(t, 6, sa)
// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 7, sa)
// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
@@ -816,3 +818,115 @@ func TestSystemAccountWithGateways(t *testing.T) {
t.Fatalf("Expected %q for account, got %q", accName, account)
}
}
func TestServerEventStatsZ(t *testing.T) {
sa, optsA, sb, _, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
url := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
ncs, err := nats.Connect(url, createUserCreds(t, sa, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncs.Close()
subj := fmt.Sprintf(serverStatsSubj, sa.ID())
sub, _ := ncs.SubscribeSync(subj)
defer sub.Unsubscribe()
ncs.Publish("foo", []byte("HELLO WORLD"))
ncs.Flush()
// Let's speed up the checking process.
sa.mu.Lock()
sa.sys.statsz = 10 * time.Millisecond
sa.sys.stmr.Reset(sa.sys.statsz)
sa.mu.Unlock()
_, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
// Get it the second time so we can check some stats
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
m := ServerStatsMsg{}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
if m.Server.ID != sa.ID() {
t.Fatalf("Did not match IDs")
}
if m.Stats.Connections != 1 {
t.Fatalf("Did not match connections of 1, got %d", m.Stats.Connections)
}
if m.Stats.ActiveAccounts != 2 {
t.Fatalf("Did not match active accounts of 2, got %d", m.Stats.ActiveAccounts)
}
if m.Stats.Sent.Msgs != 1 {
t.Fatalf("Did not match sent msgs of 1, got %d", m.Stats.Sent.Msgs)
}
if m.Stats.Received.Msgs != 1 {
t.Fatalf("Did not match received msgs of 1, got %d", m.Stats.Received.Msgs)
}
if lr := len(m.Stats.Routes); lr != 1 {
t.Fatalf("Expected a route, but got %d", lr)
}
// Now let's prompt this server to send us the statsz
subj = fmt.Sprintf(serverStatsReqSubj, sa.ID())
msg, err = ncs.Request(subj, nil, time.Second)
if err != nil {
t.Fatalf("Error trying to request statsz: %v", err)
}
m2 := ServerStatsMsg{}
if err := json.Unmarshal(msg.Data, &m2); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
if m2.Server.ID != sa.ID() {
t.Fatalf("Did not match IDs")
}
if m2.Stats.Connections != 1 {
t.Fatalf("Did not match connections of 1, got %d", m2.Stats.Connections)
}
if m2.Stats.ActiveAccounts != 2 {
t.Fatalf("Did not match active accounts of 2, got %d", m2.Stats.ActiveAccounts)
}
if m2.Stats.Sent.Msgs != 3 {
t.Fatalf("Did not match sent msgs of 3, got %d", m2.Stats.Sent.Msgs)
}
if m2.Stats.Received.Msgs != 1 {
t.Fatalf("Did not match received msgs of 1, got %d", m2.Stats.Received.Msgs)
}
if lr := len(m2.Stats.Routes); lr != 1 {
t.Fatalf("Expected a route, but got %d", lr)
}
msg, err = ncs.Request(subj, nil, time.Second)
if err != nil {
t.Fatalf("Error trying to request statsz: %v", err)
}
m3 := ServerStatsMsg{}
if err := json.Unmarshal(msg.Data, &m3); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
if m3.Server.ID != sa.ID() {
t.Fatalf("Did not match IDs")
}
if m3.Stats.Connections != 1 {
t.Fatalf("Did not match connections of 1, got %d", m3.Stats.Connections)
}
if m3.Stats.ActiveAccounts != 2 {
t.Fatalf("Did not match active accounts of 2, got %d", m3.Stats.ActiveAccounts)
}
if m3.Stats.Sent.Msgs != 5 {
t.Fatalf("Did not match sent msgs of 5, got %d", m3.Stats.Sent.Msgs)
}
if m3.Stats.Received.Msgs != 2 {
t.Fatalf("Did not match received msgs of 2, got %d", m3.Stats.Received.Msgs)
}
if lr := len(m3.Stats.Routes); lr != 1 {
t.Fatalf("Expected a route, but got %d", lr)
}
}

View File

@@ -580,8 +580,9 @@ func (s *Server) setSystemAccount(acc *Account) error {
servers: make(map[string]*serverUpdate),
subs: make(map[string]msgHandler),
sendq: make(chan *pubMsg, 128),
orphMax: 5 * AccountConnHBInterval,
chkOrph: 3 * AccountConnHBInterval,
statsz: eventsHBInterval,
orphMax: 5 * eventsHBInterval,
chkOrph: 3 * eventsHBInterval,
}
s.sys.client.initClient()
s.sys.client.echo = false
@@ -599,7 +600,10 @@ func (s *Server) setSystemAccount(acc *Account) error {
s.initEventTracking()
// Track for dead remote servers.
s.startRemoteServerSweepTimer()
s.wrapChk(s.startRemoteServerSweepTimer)()
// Send out statsz updates periodically.
s.wrapChk(s.startStatszTimer)()
return nil
}