From 760507222a0c36217ef6e806c2f511fa2cf72381 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 3 Dec 2018 21:01:17 -0800 Subject: [PATCH 1/3] Added statsz support Signed-off-by: Derek Collison --- server/events.go | 230 ++++++++++++++++++++++++++++++++++++------ server/events_test.go | 128 +++++++++++++++++++++-- server/server.go | 10 +- 3 files changed, 328 insertions(+), 40 deletions(-) diff --git a/server/events.go b/server/events.go index 29597780..a613721c 100644 --- a/server/events.go +++ b/server/events.go @@ -21,16 +21,21 @@ import ( "sync" "sync/atomic" "time" + + "github.com/nats-io/gnatsd/server/pse" ) const ( connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT" disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT" - accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS" accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS" accUpdateEventSubj = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE" connsRespSubj = "$SYS._INBOX_.%s" + accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS" shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN" + serverStatsSubj = "$SYS.SERVER.%s.STATSZ" + serverStatsReqSubj = "$SYS.REQ.SERVER.%s.STATSZ" + shutdownEventTokens = 4 serverSubjectIndex = 2 accUpdateTokens = 5 @@ -45,11 +50,19 @@ type internal struct { sid uint64 servers map[string]*serverUpdate sweeper *time.Timer + stmr *time.Timer subs map[string]msgHandler sendq chan *pubMsg wg sync.WaitGroup orphMax time.Duration chkOrph time.Duration + statsz time.Duration +} + +// ServerStatsMsg is sent periodically with stats updates. +type ServerStatsMsg struct { + Server ServerInfo `json:"server"` + Stats ServerStats `json:"statsz"` } // ConnectEventMsg is sent when a new connection is made that is part of an account. @@ -103,6 +116,37 @@ type ClientInfo struct { Stop *time.Time `json:"stop,omitempty"` } +// Various statistics we will periodically send out. +type ServerStats struct { + Mem int64 `json:"mem"` + Cores int `json:"cores"` + CPU float64 `json:"cpu"` + Connections int `json:"connections"` + TotalConnections uint64 `json:"total_connections"` + ActiveAccounts int `json:"active_accounts"` + NumSubs uint32 `json:"subscriptions"` + Sent DataStats `json:"sent"` + Received DataStats `json:"received"` + SlowConsumers int64 `json:"slow_consumers"` + Routes []*RouteStat `json:"routes,omitempty"` + Gateways []*GatewayStat `json:"gateways,omitempty"` +} + +type RouteStat struct { + ID uint64 `json:"rid"` + Sent DataStats `json:"sent"` + Received DataStats `json:"received"` + Pending int `json:"pending"` +} + +type GatewayStat struct { + ID uint64 `json:"gwid"` + Name string `json:"name"` + Sent DataStats `json:"sent"` + Received DataStats `json:"received"` + NumInbound int `json:"inbound_connections"` +} + // DataStats reports how may msg and bytes. Applicable for both sent and received. type DataStats struct { Msgs int64 `json:"msgs"` @@ -217,12 +261,8 @@ func (s *Server) eventsEnabled() bool { } // Check for orphan servers who may have gone away without notification. +// This should be wrapChk() to setup common locking. func (s *Server) checkRemoteServers() { - s.mu.Lock() - defer s.mu.Unlock() - if !s.eventsEnabled() { - return - } now := time.Now() for sid, su := range s.sys.servers { if now.Sub(su.ltime) > s.sys.orphMax { @@ -237,14 +277,111 @@ func (s *Server) checkRemoteServers() { } } -// Start a ticker that will fire periodically and check for orphaned servers. -func (s *Server) startRemoteServerSweepTimer() { - s.mu.Lock() - defer s.mu.Unlock() - if !s.eventsEnabled() { +// Grab RSS and PCPU +func updateServerUsage(v *ServerStats) { + var rss, vss int64 + var pcpu float64 + pse.ProcUsage(&pcpu, &rss, &vss) + v.Mem = rss + v.CPU = pcpu + v.Cores = numCores +} + +// Generate a route stat for our statz update. +func routeStat(r *client) *RouteStat { + if r == nil { + return nil + } + r.mu.Lock() + rs := &RouteStat{ + ID: r.cid, + Sent: DataStats{ + Msgs: r.outMsgs, + Bytes: r.outBytes, + }, + Received: DataStats{ + Msgs: r.inMsgs, + Bytes: r.inBytes, + }, + Pending: int(r.out.pb), + } + r.mu.Unlock() + return rs +} + +// Actual send method for statz updates. +// Lock should be held. +func (s *Server) sendStatsz(subj string) { + acc := s.sys.account + sc := s.sys.client + + r := acc.sl.Match(subj) + if noOutSideInterest(sc, r) { return } - s.sys.sweeper = time.AfterFunc(s.sys.chkOrph, s.checkRemoteServers) + + m := ServerStatsMsg{} + updateServerUsage(&m.Stats) + m.Stats.Connections = len(s.clients) + m.Stats.TotalConnections = s.totalClients + m.Stats.ActiveAccounts = s.activeAccounts + m.Stats.Received.Msgs = atomic.LoadInt64(&s.inMsgs) + m.Stats.Received.Bytes = atomic.LoadInt64(&s.inBytes) + m.Stats.Sent.Msgs = atomic.LoadInt64(&s.outMsgs) + m.Stats.Sent.Bytes = atomic.LoadInt64(&s.outBytes) + m.Stats.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) + m.Stats.NumSubs = s.gacc.sl.Count() + for _, r := range s.routes { + m.Stats.Routes = append(m.Stats.Routes, routeStat(r)) + } + if s.gateway.enabled { + gw := s.gateway + gw.RLock() + for name, c := range gw.out { + gs := &GatewayStat{Name: name} + c.mu.Lock() + gs.ID = c.cid + gs.Sent = DataStats{ + Msgs: c.outMsgs, + Bytes: c.outBytes, + } + c.mu.Unlock() + // Gather matching inbound connections + gs.Received = DataStats{} + for _, c := range gw.in { + c.mu.Lock() + if c.gw.name == name { + gs.Received.Msgs += c.inMsgs + gs.Received.Bytes += c.inBytes + gs.NumInbound++ + } + c.mu.Unlock() + } + m.Stats.Gateways = append(m.Stats.Gateways, gs) + } + gw.RUnlock() + } + s.sendInternalMsg(r, subj, _EMPTY_, &m.Server, &m) +} + +// Send out our statz update. +// This should be wrapChk() to setup common locking. +func (s *Server) heartbeatStatsz() { + if s.sys.stmr != nil { + s.sys.stmr.Reset(s.sys.statsz) + } + s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID)) +} + +// This should be wrapChk() to setup common locking. +func (s *Server) startStatszTimer() { + s.sys.stmr = time.AfterFunc(s.sys.statsz, s.wrapChk(s.heartbeatStatsz)) +} + +// Start a ticker that will fire periodically and check for orphaned servers. +// This should be wrapChk() to setup common locking. +func (s *Server) startRemoteServerSweepTimer() { + s.sys.sweeper = time.AfterFunc(s.sys.chkOrph, s.wrapChk(s.checkRemoteServers)) } // This will setup our system wide tracking subs. @@ -282,6 +419,11 @@ func (s *Server) initEventTracking() { s.Errorf("Error setting up internal tracking: %v", err) } + // Listen for requests for our statz + subject = fmt.Sprintf(serverStatsReqSubj, s.info.ID) + if _, err := s.sysSubscribe(subject, s.statszReq); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } } // accountClaimUpdate will receive claim updates for accounts. @@ -313,7 +455,7 @@ func (s *Server) processRemoteServerShutdown(sid string) { } } -// serverShutdownEvent is called when we get an event from another server shutting down. +// remoteServerShutdownEvent is called when we get an event from another server shutting down. func (s *Server) remoteServerShutdown(sub *subscription, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() @@ -360,10 +502,8 @@ func (s *Server) shutdownEventing() { } s.mu.Lock() - if s.sys.sweeper != nil { - s.sys.sweeper.Stop() - s.sys.sweeper = nil - } + clearTimer(&s.sys.sweeper) + clearTimer(&s.sys.stmr) s.mu.Unlock() // We will queue up a shutdown event and wait for the @@ -395,14 +535,13 @@ func (s *Server) shutdownEventing() { s.sys = nil } +// Request for our local connection count. func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() - if !s.eventsEnabled() { return } - m := accNumConnsReq{} if err := json.Unmarshal(msg, &m); err != nil { s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err) @@ -417,15 +556,23 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by } } +// statszReq is a request for us to respond with current statz. +func (s *Server) statszReq(sub *subscription, subject, reply string, msg []byte) { + s.mu.Lock() + defer s.mu.Unlock() + if !s.eventsEnabled() || reply == _EMPTY_ { + return + } + s.sendStatsz(reply) +} + // remoteConnsUpdate gets called when we receive a remote update from another server. func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg []byte) { s.mu.Lock() defer s.mu.Unlock() - if !s.eventsEnabled() { return } - m := accNumConns{} if err := json.Unmarshal(msg, &m); err != nil { s.sys.client.Errorf("Error unmarshalling account connection event message: %v", err) @@ -475,7 +622,7 @@ func (s *Server) enableAccountTracking(a *Account) { } // FIXME(dlc) - make configurable. -const AccountConnHBInterval = 30 * time.Second +const eventsHBInterval = 30 * time.Second // sendAccConnsUpdate is called to send out our information on the // account's local connections. @@ -484,8 +631,15 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) { if !s.eventsEnabled() || a == nil || a == s.sys.account || a == s.gacc { return } - + // Update timer first a.mu.Lock() + // Check to see if we have an HB running and update. + if a.ctmr == nil { + a.etmr = time.AfterFunc(eventsHBInterval, func() { s.accConnsUpdate(a) }) + } else { + a.etmr.Reset(eventsHBInterval) + } + // If no limits set, don't update, no need to. if a.mconns == 0 { a.mu.Unlock() @@ -496,12 +650,6 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) { Account: a.Name, Conns: len(a.clients), } - // Check to see if we have an HB running and update. - if a.ctmr == nil { - a.etmr = time.AfterFunc(AccountConnHBInterval, func() { s.accConnsUpdate(a) }) - } else { - a.etmr.Reset(AccountConnHBInterval) - } a.mu.Unlock() s.sendInternalMsg(subj, "", &m.Server, &m) @@ -647,6 +795,8 @@ func (s *Server) sysUnsubscribe(sub *subscription) { c.unsubscribe(acc, sub, true) } +// flushClients will make sure toi flush any clients we may have +// sent to during sendInternalMsg. func (c *client) flushClients() { last := time.Now() for cp := range c.pcd { @@ -661,9 +811,31 @@ func (c *client) flushClients() { } } +// Helper to grab name for a client. func nameForClient(c *client) string { if c.user != nil { return c.user.Nkey } return "N/A" } + +// Helper to clear timers. +func clearTimer(tp **time.Timer) { + if t := *tp; t != nil { + t.Stop() + *tp = nil + } +} + +// Helper function to wrap functions with common test +// to lock server and return if events not enabled. +func (s *Server) wrapChk(f func()) func() { + return func() { + s.mu.Lock() + defer s.mu.Unlock() + if !s.eventsEnabled() { + return + } + f() + } +} diff --git a/server/events_test.go b/server/events_test.go index ee091403..cc090cb1 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -69,7 +69,7 @@ func runTrustedServer(t *testing.T) (*Server, *Options) { return s, opts } -func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options) { +func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) { t.Helper() kp, _ := nkeys.FromSeed(oSeed) @@ -102,7 +102,7 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options) { checkClusterFormed(t, sa, sb) - return sa, optsA, sb, optsB + return sa, optsA, sb, optsB, akp } func runTrustedGateways(t *testing.T) (*Server, *Options, *Server, *Options, nkeys.KeyPair) { @@ -382,7 +382,7 @@ func TestSystemAccountInternalSubscriptions(t *testing.T) { } func TestSystemAccountConnectionLimits(t *testing.T) { - sa, optsA, sb, optsB := runTrustedCluster(t) + sa, optsA, sb, optsB, _ := runTrustedCluster(t) defer sa.Shutdown() defer sb.Shutdown() @@ -427,7 +427,7 @@ func TestSystemAccountConnectionLimits(t *testing.T) { // Test that the remote accounting works when a server is started some time later. func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) { - sa, optsA, sb, optsB := runTrustedCluster(t) + sa, optsA, sb, optsB, _ := runTrustedCluster(t) defer sa.Shutdown() sb.Shutdown() @@ -459,8 +459,8 @@ func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) { checkClusterFormed(t, sa, sb) // Trigger a load of the user account on the new server - // NOTE: If we do not load the user can be the first to request this - // account, hence the connection will succeed. + // NOTE: If we do not load the user, the user can be the first + // to request this account, hence the connection will succeed. sb.LookupAccount(pub) // Expect this to fail. @@ -472,7 +472,7 @@ func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) { // Test that the remote accounting works when a server is shutdown. func TestSystemAccountConnectionLimitsServerShutdownGraceful(t *testing.T) { - sa, optsA, sb, optsB := runTrustedCluster(t) + sa, optsA, sb, optsB, _ := runTrustedCluster(t) defer sa.Shutdown() defer sb.Shutdown() @@ -523,7 +523,7 @@ func TestSystemAccountConnectionLimitsServerShutdownGraceful(t *testing.T) { // Test that the remote accounting works when a server goes away. func TestSystemAccountConnectionLimitsServerShutdownForced(t *testing.T) { - sa, optsA, sb, optsB := runTrustedCluster(t) + sa, optsA, sb, optsB, _ := runTrustedCluster(t) defer sa.Shutdown() // Let's create a user account. @@ -816,3 +816,115 @@ func TestSystemAccountWithGateways(t *testing.T) { t.Fatalf("Expected %q for account, got %q", accName, account) } } +func TestServerEventStatusZ(t *testing.T) { + sa, optsA, sb, _, akp := runTrustedCluster(t) + defer sa.Shutdown() + defer sb.Shutdown() + + url := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port) + ncs, err := nats.Connect(url, createUserCreds(t, sa, akp)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncs.Close() + + subj := fmt.Sprintf(serverStatsSubj, sa.ID()) + sub, _ := ncs.SubscribeSync(subj) + defer sub.Unsubscribe() + ncs.Publish("foo", []byte("HELLO WORLD")) + ncs.Flush() + + // Let's speed up the checking process. + sa.mu.Lock() + sa.sys.statsz = 10 * time.Millisecond + sa.sys.stmr.Reset(sa.sys.statsz) + sa.mu.Unlock() + + _, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + // Get it the second time so we can check some stats + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + m := ServerStatsMsg{} + if err := json.Unmarshal(msg.Data, &m); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + if m.Server.ID != sa.ID() { + t.Fatalf("Did not match IDs") + } + if m.Stats.Connections != 1 { + t.Fatalf("Did not match connections of 1, got %d", m.Stats.Connections) + } + if m.Stats.ActiveAccounts != 2 { + t.Fatalf("Did not match active accounts of 2, got %d", m.Stats.ActiveAccounts) + } + if m.Stats.Sent.Msgs != 1 { + t.Fatalf("Did not match sent msgs of 1, got %d", m.Stats.Sent.Msgs) + } + if m.Stats.Received.Msgs != 1 { + t.Fatalf("Did not match received msgs of 1, got %d", m.Stats.Received.Msgs) + } + if lr := len(m.Stats.Routes); lr != 1 { + t.Fatalf("Expected a route, but got %d", lr) + } + + // Now let's prompt this server to send us the statsz + subj = fmt.Sprintf(serverStatsReqSubj, sa.ID()) + msg, err = ncs.Request(subj, nil, time.Second) + if err != nil { + t.Fatalf("Error trying to request statsz: %v", err) + } + m2 := ServerStatsMsg{} + if err := json.Unmarshal(msg.Data, &m2); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + if m2.Server.ID != sa.ID() { + t.Fatalf("Did not match IDs") + } + if m2.Stats.Connections != 1 { + t.Fatalf("Did not match connections of 1, got %d", m2.Stats.Connections) + } + if m2.Stats.ActiveAccounts != 2 { + t.Fatalf("Did not match active accounts of 2, got %d", m2.Stats.ActiveAccounts) + } + if m2.Stats.Sent.Msgs != 3 { + t.Fatalf("Did not match sent msgs of 3, got %d", m2.Stats.Sent.Msgs) + } + if m2.Stats.Received.Msgs != 1 { + t.Fatalf("Did not match received msgs of 1, got %d", m2.Stats.Received.Msgs) + } + if lr := len(m2.Stats.Routes); lr != 1 { + t.Fatalf("Expected a route, but got %d", lr) + } + + msg, err = ncs.Request(subj, nil, time.Second) + if err != nil { + t.Fatalf("Error trying to request statsz: %v", err) + } + m3 := ServerStatsMsg{} + if err := json.Unmarshal(msg.Data, &m3); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + if m3.Server.ID != sa.ID() { + t.Fatalf("Did not match IDs") + } + if m3.Stats.Connections != 1 { + t.Fatalf("Did not match connections of 1, got %d", m3.Stats.Connections) + } + if m3.Stats.ActiveAccounts != 2 { + t.Fatalf("Did not match active accounts of 2, got %d", m3.Stats.ActiveAccounts) + } + if m3.Stats.Sent.Msgs != 5 { + t.Fatalf("Did not match sent msgs of 5, got %d", m3.Stats.Sent.Msgs) + } + if m3.Stats.Received.Msgs != 2 { + t.Fatalf("Did not match received msgs of 2, got %d", m3.Stats.Received.Msgs) + } + if lr := len(m3.Stats.Routes); lr != 1 { + t.Fatalf("Expected a route, but got %d", lr) + } +} diff --git a/server/server.go b/server/server.go index 3b0dc374..2a5e2525 100644 --- a/server/server.go +++ b/server/server.go @@ -580,8 +580,9 @@ func (s *Server) setSystemAccount(acc *Account) error { servers: make(map[string]*serverUpdate), subs: make(map[string]msgHandler), sendq: make(chan *pubMsg, 128), - orphMax: 5 * AccountConnHBInterval, - chkOrph: 3 * AccountConnHBInterval, + statsz: eventsHBInterval, + orphMax: 5 * eventsHBInterval, + chkOrph: 3 * eventsHBInterval, } s.sys.client.initClient() s.sys.client.echo = false @@ -599,7 +600,10 @@ func (s *Server) setSystemAccount(acc *Account) error { s.initEventTracking() // Track for dead remote servers. - s.startRemoteServerSweepTimer() + s.wrapChk(s.startRemoteServerSweepTimer)() + + // Send out statsz updates periodically. + s.wrapChk(s.startStatszTimer)() return nil } From f9912700c8cf7ee5430d94b4da87e4048f6e9f67 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 4 Dec 2018 08:48:40 -0800 Subject: [PATCH 2/3] Rebase from master Signed-off-by: Derek Collison --- server/events.go | 10 +--------- server/events_test.go | 4 +++- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/server/events.go b/server/events.go index a613721c..d29bca59 100644 --- a/server/events.go +++ b/server/events.go @@ -312,14 +312,6 @@ func routeStat(r *client) *RouteStat { // Actual send method for statz updates. // Lock should be held. func (s *Server) sendStatsz(subj string) { - acc := s.sys.account - sc := s.sys.client - - r := acc.sl.Match(subj) - if noOutSideInterest(sc, r) { - return - } - m := ServerStatsMsg{} updateServerUsage(&m.Stats) m.Stats.Connections = len(s.clients) @@ -361,7 +353,7 @@ func (s *Server) sendStatsz(subj string) { } gw.RUnlock() } - s.sendInternalMsg(r, subj, _EMPTY_, &m.Server, &m) + s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) } // Send out our statz update. diff --git a/server/events_test.go b/server/events_test.go index cc090cb1..6dcab1e3 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -791,7 +791,9 @@ func TestSystemAccountWithGateways(t *testing.T) { sub, _ := nca.SubscribeSync("$SYS.ACCOUNT.>") defer sub.Unsubscribe() nca.Flush() - checkExpectedSubs(t, 6, sa) + // If this tests fails with wrong number after 10 seconds we may have + // added a new inititial subscription for the eventing system. + checkExpectedSubs(t, 7, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) From 53c70e6ce17d93557e4ca00996fdb030e746ee49 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 4 Dec 2018 09:09:27 -0800 Subject: [PATCH 3/3] Use atomic.Load Signed-off-by: Derek Collison --- server/events.go | 16 ++++++++-------- server/events_test.go | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/events.go b/server/events.go index d29bca59..f50891c0 100644 --- a/server/events.go +++ b/server/events.go @@ -296,12 +296,12 @@ func routeStat(r *client) *RouteStat { rs := &RouteStat{ ID: r.cid, Sent: DataStats{ - Msgs: r.outMsgs, - Bytes: r.outBytes, + Msgs: atomic.LoadInt64(&r.outMsgs), + Bytes: atomic.LoadInt64(&r.outBytes), }, Received: DataStats{ - Msgs: r.inMsgs, - Bytes: r.inBytes, + Msgs: atomic.LoadInt64(&r.inMsgs), + Bytes: atomic.LoadInt64(&r.inBytes), }, Pending: int(r.out.pb), } @@ -334,8 +334,8 @@ func (s *Server) sendStatsz(subj string) { c.mu.Lock() gs.ID = c.cid gs.Sent = DataStats{ - Msgs: c.outMsgs, - Bytes: c.outBytes, + Msgs: atomic.LoadInt64(&c.outMsgs), + Bytes: atomic.LoadInt64(&c.outBytes), } c.mu.Unlock() // Gather matching inbound connections @@ -343,8 +343,8 @@ func (s *Server) sendStatsz(subj string) { for _, c := range gw.in { c.mu.Lock() if c.gw.name == name { - gs.Received.Msgs += c.inMsgs - gs.Received.Bytes += c.inBytes + gs.Received.Msgs += atomic.LoadInt64(&c.inMsgs) + gs.Received.Bytes += atomic.LoadInt64(&c.inBytes) gs.NumInbound++ } c.mu.Unlock() diff --git a/server/events_test.go b/server/events_test.go index 6dcab1e3..16230b0c 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -818,7 +818,7 @@ func TestSystemAccountWithGateways(t *testing.T) { t.Fatalf("Expected %q for account, got %q", accName, account) } } -func TestServerEventStatusZ(t *testing.T) { +func TestServerEventStatsZ(t *testing.T) { sa, optsA, sb, _, akp := runTrustedCluster(t) defer sa.Shutdown() defer sb.Shutdown()