diff --git a/server/client.go b/server/client.go index bb93d71d..fe13b948 100644 --- a/server/client.go +++ b/server/client.go @@ -124,17 +124,18 @@ const ( // Some client state represented as flags const ( - connectReceived clientFlag = 1 << iota // The CONNECT proto has been received - infoReceived // The INFO protocol has been received - firstPongSent // The first PONG has been sent - handshakeComplete // For TLS clients, indicate that the handshake is complete - flushOutbound // Marks client as having a flushOutbound call in progress. - noReconnect // Indicate that on close, this connection should not attempt a reconnect - closeConnection // Marks that closeConnection has already been called. - connMarkedClosed // Marks that markConnAsClosed has already been called. - writeLoopStarted // Marks that the writeLoop has been started. - skipFlushOnClose // Marks that flushOutbound() should not be called on connection close. - expectConnect // Marks if this connection is expected to send a CONNECT + connectReceived clientFlag = 1 << iota // The CONNECT proto has been received + infoReceived // The INFO protocol has been received + firstPongSent // The first PONG has been sent + handshakeComplete // For TLS clients, indicate that the handshake is complete + flushOutbound // Marks client as having a flushOutbound call in progress. + noReconnect // Indicate that on close, this connection should not attempt a reconnect + closeConnection // Marks that closeConnection has already been called. + connMarkedClosed // Marks that markConnAsClosed has already been called. + writeLoopStarted // Marks that the writeLoop has been started. + skipFlushOnClose // Marks that flushOutbound() should not be called on connection close. + expectConnect // Marks if this connection is expected to send a CONNECT + connectProcessFinished // Marks if this connection has finished the connect process. ) // set the flag (would be equivalent to set the boolean to true) diff --git a/server/leafnode.go b/server/leafnode.go index ef63c0fc..1103c899 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -916,7 +916,7 @@ func (c *client) processLeafnodeInfo(info *Info) { s = c.srv c.mu.Unlock() - finishConnect := info.ConnectInfo && !firstINFO + finishConnect := info.ConnectInfo if resumeConnect && s != nil { s.leafNodeResumeConnectProcess(c) if !info.InfoOnConnect { @@ -1364,7 +1364,7 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) { ln.mu.Lock() skip := sub.origin != nil && string(sub.origin) == ln.remoteCluster() // do not skip on !ln.canSubscribe(string(sub.subject)) - // Given allow:foo, > would be rejected. For leaf nodes filtering is done on the (soliciting) end ( + // Given allow:foo, > would be rejected. For leaf nodes filtering is done on the (soliciting) end. ln.mu.Unlock() if skip { continue @@ -2124,6 +2124,8 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot return preBuf, 0, nil } +const connectProcessTimeout = 2 * time.Second + // This is invoked for remote LEAF remote connections after processing the INFO // protocol. This will do the TLS handshake (if needed be) func (s *Server) leafNodeResumeConnectProcess(c *client) { @@ -2175,26 +2177,27 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) { // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) - cid := c.cid - c.mu.Unlock() - c.Debugf("Remote leafnode connect msg sent") - // timeout leafNodeFinishConnectProcess - time.AfterFunc(s.getOpts().PingInterval, func() { - s.mu.Lock() - // check if addLeafNodeConnection was called by leafNodeFinishConnectProcess - _, found := s.leafs[cid] - s.mu.Unlock() - if !found { - c.mu.Lock() - closed := c.isClosed() + c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() { + c.mu.Lock() + // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess + if !c.flags.setIfNotSet(connectProcessFinished) { c.mu.Unlock() - if !closed { - c.sendErrAndDebug("Stale Leaf Node Connection - Closing") - c.closeConnection(StaleConnection) - } + return + } + if !c.ping.tmr.Stop() { + <-c.ping.tmr.C + c.ping.tmr = nil + } + closed := c.isClosed() + c.mu.Unlock() + if !closed { + c.sendErrAndDebug("Stale Leaf Node Connection - Closing") + c.closeConnection(StaleConnection) } }) + c.mu.Unlock() + c.Debugf("Remote leafnode connect msg sent") } // This is invoked for remote LEAF remote connections after processing the INFO @@ -2202,6 +2205,10 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) { // This will send LS+ the CONNECT protocol and register the leaf node. func (s *Server) leafNodeFinishConnectProcess(c *client) { c.mu.Lock() + if !c.flags.setIfNotSet(connectProcessFinished) { + c.mu.Unlock() + return + } if c.isClosed() { c.mu.Unlock() s.removeLeafNodeConnection(c) @@ -2215,6 +2222,11 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) { // Capture account before releasing lock acc := c.acc + // cancel connectProcessTimeout + if !c.ping.tmr.Stop() { + <-c.ping.tmr.C + c.ping.tmr = nil + } c.mu.Unlock() // Make sure we register with the account here. diff --git a/server/server.go b/server/server.go index c957297c..23fa6b79 100644 --- a/server/server.go +++ b/server/server.go @@ -86,8 +86,8 @@ type Info struct { Import *SubjectPermission `json:"import,omitempty"` Export *SubjectPermission `json:"export,omitempty"` LNOC bool `json:"lnoc,omitempty"` - InfoOnConnect bool `json:"info_on_connect,omitempty"` // When true the server will respond connect to with an INFO - ConnectInfo bool `json:"connect_info,omitempty"` // When true this is the servers response to CONNECT + InfoOnConnect bool `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO + ConnectInfo bool `json:"connect_info,omitempty"` // When true this is the server INFO response to CONNECT // Gateways Specific Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO)