mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fixed DATA RACE and ensure route is not created/accepted on shutdown
- Created a setter for the closed flag. - Check if route is closed under lock and set a boolean if so, so we don't check c.route outside of c's mutex. - Ensure that we do not create a route on shutdown, which would leave a connection hanging (was seen in some config reload tests).
This commit is contained in:
@@ -1321,14 +1321,15 @@ func (c *client) closeConnection() {
|
||||
}
|
||||
srv := c.srv
|
||||
|
||||
retryImplicit := false
|
||||
var (
|
||||
routeClosed bool
|
||||
retryImplicit bool
|
||||
)
|
||||
if c.route != nil {
|
||||
retryImplicit = c.route.retry
|
||||
}
|
||||
|
||||
closed := false
|
||||
if c.route != nil {
|
||||
closed = c.route.closed
|
||||
routeClosed = c.route.closed
|
||||
if !routeClosed {
|
||||
retryImplicit = c.route.retry
|
||||
}
|
||||
}
|
||||
|
||||
c.mu.Unlock()
|
||||
@@ -1349,7 +1350,7 @@ func (c *client) closeConnection() {
|
||||
}
|
||||
|
||||
// Don't reconnect routes that are being closed.
|
||||
if c.route != nil && closed {
|
||||
if routeClosed {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1387,6 +1388,16 @@ func (c *client) closeConnection() {
|
||||
}
|
||||
}
|
||||
|
||||
// If the client is a route connection, sets the `closed` flag to true
|
||||
// to prevent any reconnecting attempt when c.closeConnection() is called.
|
||||
func (c *client) setRouteNoReconnectOnClose() {
|
||||
c.mu.Lock()
|
||||
if c.route != nil {
|
||||
c.route.closed = true
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Logging functionality scoped to a client or route.
|
||||
|
||||
func (c *client) Errorf(format string, v ...interface{}) {
|
||||
|
||||
@@ -265,10 +265,8 @@ func (r *routesOption) Apply(server *Server) {
|
||||
for _, remove := range r.remove {
|
||||
for _, client := range routes {
|
||||
if client.route.url == remove {
|
||||
client.mu.Lock()
|
||||
// Do not attempt to reconnect when route is removed.
|
||||
client.route.closed = true
|
||||
client.mu.Unlock()
|
||||
client.setRouteNoReconnectOnClose()
|
||||
client.closeConnection()
|
||||
server.Noticef("Removed route %v", remove)
|
||||
}
|
||||
@@ -597,9 +595,7 @@ func (s *Server) reloadAuthorization() {
|
||||
for _, client := range routes {
|
||||
// Disconnect any unauthorized routes.
|
||||
if !s.isRouterAuthorized(client) {
|
||||
client.mu.Lock()
|
||||
client.route.closed = true
|
||||
client.mu.Unlock()
|
||||
client.setRouteNoReconnectOnClose()
|
||||
client.authViolation()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -405,8 +405,17 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
// uinterrupted, causing the Shutdown() to wait indefinitively.
|
||||
// We need to store the client in a special map, under a special lock.
|
||||
s.grMu.Lock()
|
||||
s.grTmpClients[c.cid] = c
|
||||
running := s.grRunning
|
||||
if running {
|
||||
s.grTmpClients[c.cid] = c
|
||||
}
|
||||
s.grMu.Unlock()
|
||||
if !running {
|
||||
c.mu.Unlock()
|
||||
c.setRouteNoReconnectOnClose()
|
||||
c.closeConnection()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Spin up the read loop.
|
||||
s.startGoRoutine(func() { c.readLoop() })
|
||||
|
||||
@@ -314,6 +314,7 @@ func (s *Server) Shutdown() {
|
||||
s.grMu.Unlock()
|
||||
// Copy off the routes
|
||||
for i, r := range s.routes {
|
||||
r.setRouteNoReconnectOnClose()
|
||||
conns[i] = r
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user