Ensure that leafNodeFinishConnectProcess is only executed once.

incorporate review comments

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2021-04-09 14:56:13 -04:00
committed by Matthias Hanel
parent 5d1f36dd17
commit f7a772f097
3 changed files with 44 additions and 31 deletions

View File

@@ -124,17 +124,18 @@ const (
// Some client state represented as flags
const (
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
infoReceived // The INFO protocol has been received
firstPongSent // The first PONG has been sent
handshakeComplete // For TLS clients, indicate that the handshake is complete
flushOutbound // Marks client as having a flushOutbound call in progress.
noReconnect // Indicate that on close, this connection should not attempt a reconnect
closeConnection // Marks that closeConnection has already been called.
connMarkedClosed // Marks that markConnAsClosed has already been called.
writeLoopStarted // Marks that the writeLoop has been started.
skipFlushOnClose // Marks that flushOutbound() should not be called on connection close.
expectConnect // Marks if this connection is expected to send a CONNECT
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
infoReceived // The INFO protocol has been received
firstPongSent // The first PONG has been sent
handshakeComplete // For TLS clients, indicate that the handshake is complete
flushOutbound // Marks client as having a flushOutbound call in progress.
noReconnect // Indicate that on close, this connection should not attempt a reconnect
closeConnection // Marks that closeConnection has already been called.
connMarkedClosed // Marks that markConnAsClosed has already been called.
writeLoopStarted // Marks that the writeLoop has been started.
skipFlushOnClose // Marks that flushOutbound() should not be called on connection close.
expectConnect // Marks if this connection is expected to send a CONNECT
connectProcessFinished // Marks if this connection has finished the connect process.
)
// set the flag (would be equivalent to set the boolean to true)

View File

@@ -916,7 +916,7 @@ func (c *client) processLeafnodeInfo(info *Info) {
s = c.srv
c.mu.Unlock()
finishConnect := info.ConnectInfo && !firstINFO
finishConnect := info.ConnectInfo
if resumeConnect && s != nil {
s.leafNodeResumeConnectProcess(c)
if !info.InfoOnConnect {
@@ -1364,7 +1364,7 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
ln.mu.Lock()
skip := sub.origin != nil && string(sub.origin) == ln.remoteCluster()
// do not skip on !ln.canSubscribe(string(sub.subject))
// Given allow:foo, > would be rejected. For leaf nodes filtering is done on the (soliciting) end (
// Given allow:foo, > would be rejected. For leaf nodes filtering is done on the (soliciting) end.
ln.mu.Unlock()
if skip {
continue
@@ -2124,6 +2124,8 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
return preBuf, 0, nil
}
const connectProcessTimeout = 2 * time.Second
// This is invoked for remote LEAF remote connections after processing the INFO
// protocol. This will do the TLS handshake (if needed be)
func (s *Server) leafNodeResumeConnectProcess(c *client) {
@@ -2175,26 +2177,27 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) {
// Spin up the write loop.
s.startGoRoutine(func() { c.writeLoop() })
cid := c.cid
c.mu.Unlock()
c.Debugf("Remote leafnode connect msg sent")
// timeout leafNodeFinishConnectProcess
time.AfterFunc(s.getOpts().PingInterval, func() {
s.mu.Lock()
// check if addLeafNodeConnection was called by leafNodeFinishConnectProcess
_, found := s.leafs[cid]
s.mu.Unlock()
if !found {
c.mu.Lock()
closed := c.isClosed()
c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
c.mu.Lock()
// check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
if !c.flags.setIfNotSet(connectProcessFinished) {
c.mu.Unlock()
if !closed {
c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
c.closeConnection(StaleConnection)
}
return
}
if !c.ping.tmr.Stop() {
<-c.ping.tmr.C
c.ping.tmr = nil
}
closed := c.isClosed()
c.mu.Unlock()
if !closed {
c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
c.closeConnection(StaleConnection)
}
})
c.mu.Unlock()
c.Debugf("Remote leafnode connect msg sent")
}
// This is invoked for remote LEAF remote connections after processing the INFO
@@ -2202,6 +2205,10 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) {
// This will send LS+ the CONNECT protocol and register the leaf node.
func (s *Server) leafNodeFinishConnectProcess(c *client) {
c.mu.Lock()
if !c.flags.setIfNotSet(connectProcessFinished) {
c.mu.Unlock()
return
}
if c.isClosed() {
c.mu.Unlock()
s.removeLeafNodeConnection(c)
@@ -2215,6 +2222,11 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) {
// Capture account before releasing lock
acc := c.acc
// cancel connectProcessTimeout
if !c.ping.tmr.Stop() {
<-c.ping.tmr.C
c.ping.tmr = nil
}
c.mu.Unlock()
// Make sure we register with the account here.

View File

@@ -86,8 +86,8 @@ type Info struct {
Import *SubjectPermission `json:"import,omitempty"`
Export *SubjectPermission `json:"export,omitempty"`
LNOC bool `json:"lnoc,omitempty"`
InfoOnConnect bool `json:"info_on_connect,omitempty"` // When true the server will respond connect to with an INFO
ConnectInfo bool `json:"connect_info,omitempty"` // When true this is the servers response to CONNECT
InfoOnConnect bool `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO
ConnectInfo bool `json:"connect_info,omitempty"` // When true this is the server INFO response to CONNECT
// Gateways Specific
Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO)