From 7ce47fd1827fac8be1afc90afbdccc21fc4ae24d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 25 Sep 2023 11:18:15 -0700 Subject: [PATCH] Move server running state to atomic to avoid contention at NRG layer. Signed-off-by: Derek Collison --- server/client.go | 4 ++-- server/events.go | 8 ++++---- server/jetstream_cluster.go | 2 +- server/mqtt.go | 2 +- server/route.go | 2 +- server/server.go | 13 +++++-------- server/server_test.go | 4 +--- server/websocket.go | 2 +- 8 files changed, 16 insertions(+), 21 deletions(-) diff --git a/server/client.go b/server/client.go index 25b393a7..7a521047 100644 --- a/server/client.go +++ b/server/client.go @@ -3735,7 +3735,7 @@ func (c *client) processInboundMsg(msg []byte) { } } -// selectMappedSubject will chose the mapped subject based on the client's inbound subject. +// selectMappedSubject will choose the mapped subject based on the client's inbound subject. func (c *client) selectMappedSubject() bool { nsubj, changed := c.acc.selectMappedSubject(string(c.pa.subject)) if changed { @@ -5225,7 +5225,7 @@ func (c *client) reconnect() { // It is possible that the server is being shutdown. // If so, don't try to reconnect - if !srv.running { + if !srv.isRunning() { return } diff --git a/server/events.go b/server/events.go index 3b157cef..bb249d43 100644 --- a/server/events.go +++ b/server/events.go @@ -715,7 +715,7 @@ func (s *Server) eventsRunning() bool { return false } s.mu.RLock() - er := s.running && s.eventsEnabled() + er := s.isRunning() && s.eventsEnabled() s.mu.RUnlock() return er } @@ -739,7 +739,7 @@ func (s *Server) eventsEnabled() bool { func (s *Server) TrackedRemoteServers() int { s.mu.RLock() defer s.mu.RUnlock() - if !s.running || !s.eventsEnabled() { + if !s.isRunning() || !s.eventsEnabled() { return -1 } return len(s.sys.servers) @@ -1484,7 +1484,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su // Should do normal updates before bailing if wrong domain. s.mu.Lock() - if s.running && s.eventsEnabled() && ssm.Server.ID != s.info.ID { + if s.isRunning() && s.eventsEnabled() && ssm.Server.ID != s.info.ID { s.updateRemoteServer(&si) } s.mu.Unlock() @@ -1943,7 +1943,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, sub s.mu.Lock() // check again here if we have been shutdown. - if !s.running || !s.eventsEnabled() { + if !s.isRunning() || !s.eventsEnabled() { s.mu.Unlock() return } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c766856f..4c2a7a1d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -183,7 +183,7 @@ const ( func (s *Server) trackedJetStreamServers() (js, total int) { s.mu.RLock() defer s.mu.RUnlock() - if !s.running || !s.eventsEnabled() { + if !s.isRunning() || !s.eventsEnabled() { return -1, -1 } s.nodeToInfo.Range(func(k, v interface{}) bool { diff --git a/server/mqtt.go b/server/mqtt.go index bc93ca5d..b285af35 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -499,7 +499,7 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client { c.mu.Unlock() s.mu.Lock() - if !s.running || s.ldm { + if !s.isRunning() || s.ldm { if s.shutdown { conn.Close() } diff --git a/server/route.go b/server/route.go index dfda145b..ff5d94c7 100644 --- a/server/route.go +++ b/server/route.go @@ -1834,7 +1834,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string id := info.ID s.mu.Lock() - if !s.running || s.routesReject { + if !s.isRunning() || s.routesReject { s.mu.Unlock() return false } diff --git a/server/server.go b/server/server.go index 55d271e0..4e401fc2 100644 --- a/server/server.go +++ b/server/server.go @@ -131,7 +131,7 @@ type Server struct { configFile string optsMu sync.RWMutex opts *Options - running bool + running atomic.Bool shutdown bool listener net.Listener listenerErr error @@ -1482,10 +1482,7 @@ func (s *Server) Running() bool { // Protected check on running state func (s *Server) isRunning() bool { - s.mu.RLock() - running := s.running - s.mu.RUnlock() - return running + return s.running.Load() } func (s *Server) logPid() error { @@ -2083,8 +2080,8 @@ func (s *Server) Start() { s.checkAuthforWarnings() // Avoid RACE between Start() and Shutdown() + s.running.Store(true) s.mu.Lock() - s.running = true // Update leafNodeEnabled in case options have changed post NewServer() // and before Start() (we should not be able to allow that, but server has // direct reference to user-provided options - at least before a Reload() is @@ -2375,7 +2372,7 @@ func (s *Server) Shutdown() { opts := s.getOpts() s.shutdown = true - s.running = false + s.running.Store(false) s.grMu.Lock() s.grRunning = false s.grMu.Unlock() @@ -3041,7 +3038,7 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client { // list of connections to close. It won't contain this one, so we need // to bail out now otherwise the readLoop started down there would not // be interrupted. Skip also if in lame duck mode. - if !s.running || s.ldm { + if !s.isRunning() || s.ldm { // There are some tests that create a server but don't start it, // and use "async" clients and perform the parsing manually. Such // clients would branch here (since server is not running). However, diff --git a/server/server_test.go b/server/server_test.go index 7bac1cc3..4030aafa 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1222,9 +1222,7 @@ func TestServerValidateGatewaysOptions(t *testing.T) { func TestAcceptError(t *testing.T) { o := DefaultOptions() s := New(o) - s.mu.Lock() - s.running = true - s.mu.Unlock() + s.running.Store(true) defer s.Shutdown() orgDelay := time.Hour delay := s.acceptError("Test", fmt.Errorf("any error"), orgDelay) diff --git a/server/websocket.go b/server/websocket.go index 6bf82305..4eb93977 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1220,7 +1220,7 @@ func (s *Server) createWSClient(conn net.Conn, ws *websocket) *client { c.mu.Unlock() s.mu.Lock() - if !s.running || s.ldm { + if !s.isRunning() || s.ldm { if s.shutdown { conn.Close() }