mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 18:50:41 -07:00
Ensure Shutdown() waits for outstanding routes go routines
We need to make sure that when Shutdown() returns, routes go routines that try to connect or reconnect have returned. Otherwise, this may affect tests running one after the other (a server from one test may connect to a server in the next test).
This commit is contained in:
@@ -1054,6 +1054,12 @@ func (c *client) closeConnection() {
|
||||
srv.mu.Lock()
|
||||
defer srv.mu.Unlock()
|
||||
|
||||
// It is possible that the server is being shutdown.
|
||||
// If so, don't try to reconnect
|
||||
if !srv.running {
|
||||
return
|
||||
}
|
||||
|
||||
if rid != "" && srv.remotes[rid] != nil {
|
||||
Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid)
|
||||
return
|
||||
@@ -1062,6 +1068,9 @@ func (c *client) closeConnection() {
|
||||
return
|
||||
} else if rtype != Implicit || retryImplicit {
|
||||
Debugf("Attempting reconnect for solicited route \"%s\"", rurl)
|
||||
// Keep track of this go-routine so we can wait for it on
|
||||
// server shutdown.
|
||||
srv.routeWG.Add(1)
|
||||
go srv.reConnectToRoute(rurl, rtype)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,6 +191,7 @@ func (s *Server) processImplicitRoute(info *Info) {
|
||||
if info.AuthRequired {
|
||||
r.User = url.UserPassword(s.opts.ClusterUsername, s.opts.ClusterPassword)
|
||||
}
|
||||
s.routeWG.Add(1)
|
||||
go s.connectToRoute(r, false)
|
||||
}
|
||||
|
||||
@@ -598,6 +599,7 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) {
|
||||
}
|
||||
|
||||
func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
|
||||
defer s.routeWG.Done()
|
||||
for s.isRunning() && rURL != nil {
|
||||
Debugf("Trying to connect to route on %s", rURL.Host)
|
||||
conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL)
|
||||
@@ -627,7 +629,10 @@ func (c *client) isSolicitedRoute() bool {
|
||||
}
|
||||
|
||||
func (s *Server) solicitRoutes() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for _, r := range s.opts.Routes {
|
||||
s.routeWG.Add(1)
|
||||
go s.connectToRoute(r, true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,6 +64,7 @@ type Server struct {
|
||||
routeListener net.Listener
|
||||
routeInfo Info
|
||||
routeInfoJSON []byte
|
||||
routeWG sync.WaitGroup // to wait on (re)connect go routines, etc..
|
||||
rcQuit chan bool
|
||||
}
|
||||
|
||||
@@ -302,6 +303,9 @@ func (s *Server) Shutdown() {
|
||||
<-s.done
|
||||
doneExpected--
|
||||
}
|
||||
|
||||
// Wait for route (re)connect go routines to be done.
|
||||
s.routeWG.Wait()
|
||||
}
|
||||
|
||||
// AcceptLoop is exported for easier testing.
|
||||
@@ -757,7 +761,7 @@ func (s *Server) GetRouteListenEndpoint() string {
|
||||
return net.JoinHostPort(host, strconv.Itoa(s.opts.ClusterPort))
|
||||
}
|
||||
|
||||
// Id returns the server's ID
|
||||
// ID returns the server's ID
|
||||
func (s *Server) ID() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user