Merge pull request #1414 from nats-io/save_leafnode_conn_in_temp_map_until_connect

[FIXED] Possible stall on shutdown with leafnode setup
This commit is contained in:
Ivan Kozlovic
2020-05-22 17:14:28 -06:00
committed by GitHub
2 changed files with 60 additions and 0 deletions

View File

@@ -794,6 +794,14 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
}
}
// Keep track in case server is shutdown before we can successfully register.
if !s.addToTempClients(c.cid, c) {
c.mu.Unlock()
c.setNoReconnect()
c.closeConnection(ServerShutdown)
return nil
}
// Spin up the read loop.
s.startGoRoutine(func() { c.readLoop() })
@@ -955,6 +963,7 @@ func (s *Server) addLeafNodeConnection(c *client) {
s.mu.Lock()
s.leafs[cid] = c
s.mu.Unlock()
s.removeFromTempClients(cid)
}
func (s *Server) removeLeafNodeConnection(c *client) {
@@ -964,6 +973,7 @@ func (s *Server) removeLeafNodeConnection(c *client) {
s.mu.Lock()
delete(s.leafs, cid)
s.mu.Unlock()
s.removeFromTempClients(cid)
}
type leafConnectInfo struct {

View File

@@ -1436,3 +1436,53 @@ func TestLeafNodeHubWithGateways(t *testing.T) {
t.Fatalf("Unexpected reply: %q", msg.Data)
}
}
func TestLeafNodeTmpClients(t *testing.T) {
ao := DefaultOptions()
ao.LeafNode.Host = "127.0.0.1"
ao.LeafNode.Port = -1
a := RunServer(ao)
defer a.Shutdown()
c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", ao.LeafNode.Port))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer c.Close()
// Read info
br := bufio.NewReader(c)
br.ReadLine()
checkTmp := func(expected int) {
t.Helper()
checkFor(t, time.Second, 15*time.Millisecond, func() error {
a.grMu.Lock()
l := len(a.grTmpClients)
a.grMu.Unlock()
if l != expected {
return fmt.Errorf("Expected tmp map to have %v entries, got %v", expected, l)
}
return nil
})
}
checkTmp(1)
// Close client and wait check that it is removed.
c.Close()
checkTmp(0)
// Check with normal leafnode connection that once connected,
// the tmp map is also emptied.
bo := DefaultOptions()
bo.LeafNode.ReconnectInterval = 5 * time.Millisecond
u, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ao.LeafNode.Port))
if err != nil {
t.Fatalf("Error creating url: %v", err)
}
bo.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
b := RunServer(bo)
defer a.Shutdown()
checkLeafNodeConnected(t, b)
checkTmp(0)
}