Fixed accept loops that could leave connections opened

This was discovered with the test TestLeafNodeWithGatewaysServerRestart
that was sometimes failing. Investigation showed that when cluster B
was shutdown, one of the server on A that had a connection from B
that just broke tried to reconnect (as part of reconnect retries of
implicit gateways) to a server in B that was in the process of shuting down.
The connection had been accepted but createGateway not called because
the server's running boolean had been set to false as part of the shutdown.
However, the connection was not closed so the server on A had a valid
connection to a dead server from cluster B. When the B cluster (now single
server) was restarted and a LeafNode connection connected to it, then
the gateway from B to A was created, that server on A did not create outbound
connection to that B server because it already had one (the zombie one).

So this PR strengthens the starting of accept loops and also make sure
that if a connection (all type of connections) is not accepted because
the server is shuting down, that connection is properly closed.

Since all accept loops had almost same code, made a generic function
that accept functions to call specific create connection functions.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-07-05 11:36:56 -06:00
parent 33deee8a64
commit 9288283d90
8 changed files with 342 additions and 219 deletions

View File

@@ -342,18 +342,9 @@ func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
}
}
// This is the leafnode's accept loop. This runs as a go-routine.
// The listen specification is resolved (if use of random port),
// then a listener is started. After that, this routine enters
// a loop (until the server is shutdown) accepting incoming
// leaf node connections from remote servers.
func (s *Server) leafNodeAcceptLoop(ch chan struct{}) {
defer func() {
if ch != nil {
close(ch)
}
}()
// This starts the leafnode accept loop in a go routine, unless it
// is detected that the server has already been shutdown.
func (s *Server) startLeafNodeAcceptLoop() {
// Snapshot server options.
opts := s.getOpts()
@@ -362,9 +353,15 @@ func (s *Server) leafNodeAcceptLoop(ch chan struct{}) {
port = 0
}
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
l, e := net.Listen("tcp", hp)
if e != nil {
s.mu.Unlock()
s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
return
}
@@ -372,7 +369,6 @@ func (s *Server) leafNodeAcceptLoop(ch chan struct{}) {
s.Noticef("Listening for leafnode connections on %s",
net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
s.mu.Lock()
tlsRequired := opts.LeafNode.TLSConfig != nil
tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
info := Info{
@@ -426,28 +422,8 @@ func (s *Server) leafNodeAcceptLoop(ch chan struct{}) {
if warn {
s.Warnf(leafnodeTLSInsecureWarning)
}
go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil) }, nil)
s.mu.Unlock()
// Let them know we are up
close(ch)
ch = nil
tmpDelay := ACCEPT_MIN_SLEEP
for s.isRunning() {
conn, err := l.Accept()
if err != nil {
tmpDelay = s.acceptError("LeafNode", err, tmpDelay)
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
s.startGoRoutine(func() {
s.createLeafNode(conn, nil)
s.grWG.Done()
})
}
s.Debugf("Leafnode accept loop exiting..")
s.done <- true
}
// RegEx to match a creds file with user JWT and Seed.