Merge pull request #2902 from nats-io/gw_fix_race_set_first_ping_timer

Gateways: data race when setting first ping timer
This commit is contained in:
Ivan Kozlovic
2022-03-06 10:18:50 -07:00
committed by GitHub
5 changed files with 39 additions and 40 deletions

View File

@@ -1728,7 +1728,7 @@ func (c *client) processConnect(arg []byte) error {
c.rtt = computeRTT(c.start)
if c.srv != nil {
c.clearPingTimer()
c.srv.setFirstPingTimer(c)
c.setFirstPingTimer()
}
}
kind := c.kind
@@ -5318,3 +5318,32 @@ func (c *client) Warnf(format string, v ...interface{}) {
format = fmt.Sprintf("%s - %s", c, format)
c.srv.Warnf(format, v...)
}
// Set the very first PING to a lower interval to capture the initial RTT.
// After that the PING interval will be set to the user defined value.
// Client lock should be held.
func (c *client) setFirstPingTimer() {
s := c.srv
if s == nil {
return
}
opts := s.getOpts()
d := opts.PingInterval
if !opts.DisableShortFirstPing {
if c.kind != CLIENT {
if d > firstPingInterval {
d = firstPingInterval
}
if c.kind == GATEWAY {
d = adjustPingIntervalForGateway(d)
}
} else if d > firstClientPingInterval {
d = firstClientPingInterval
}
}
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
addDelay := rand.Int63n(int64(d / 5))
d += time.Duration(addDelay)
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}

View File

@@ -928,17 +928,11 @@ func (c *client) processGatewayConnect(arg []byte) error {
return ErrWrongGateway
}
// For a gateway connection, c.gw is guaranteed not to be nil here
// (created in createGateway() and never set to nil).
// For inbound connections, it is important to know in the parser
// if the CONNECT was received first, so we use this boolean (as
// opposed to client.flags that require locking) to indicate that
// CONNECT was processed. Again, this boolean is set/read in the
// readLoop without locking.
c.mu.Lock()
c.gw.connected = true
// Set the Ping timer after sending connect and info.
s.setFirstPingTimer(c)
c.setFirstPingTimer()
c.mu.Unlock()
return nil
}
@@ -1052,7 +1046,9 @@ func (c *client) processGatewayInfo(info *Info) {
// Now that the outbound gateway is registered, we can remove from temp map.
s.removeFromTempClients(cid)
// Set the Ping timer after sending connect and info.
s.setFirstPingTimer(c)
c.mu.Lock()
c.setFirstPingTimer()
c.mu.Unlock()
} else {
// There was a bug that would cause a connection to possibly
// be called twice resulting in reconnection of twice the

View File

@@ -1402,7 +1402,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
}
// Set the Ping timer
s.setFirstPingTimer(c)
c.setFirstPingTimer()
// If we received pub deny permissions from the other end, merge with existing ones.
c.mergeDenyPermissions(pub, proto.DenyPub)
@@ -2526,7 +2526,7 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) {
c.mu.Lock()
closed := c.isClosed()
if !closed {
s.setFirstPingTimer(c)
c.setFirstPingTimer()
}
c.mu.Unlock()
if closed {

View File

@@ -1346,7 +1346,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
}
// Set the Ping timer
s.setFirstPingTimer(c)
c.setFirstPingTimer()
// For routes, the "client" is added to s.routes only when processing
// the INFO protocol, that is much later.

View File

@@ -3619,32 +3619,6 @@ func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool {
return false
}
// Invoked for route, leaf and gateway connections. Set the very first
// PING to a lower interval to capture the initial RTT.
// After that the PING interval will be set to the user defined value.
// Client lock should be held.
func (s *Server) setFirstPingTimer(c *client) {
opts := s.getOpts()
d := opts.PingInterval
if !opts.DisableShortFirstPing {
if c.kind != CLIENT {
if d > firstPingInterval {
d = firstPingInterval
}
if c.kind == GATEWAY {
d = adjustPingIntervalForGateway(d)
}
} else if d > firstClientPingInterval {
d = firstClientPingInterval
}
}
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
addDelay := rand.Int63n(int64(d / 5))
d += time.Duration(addDelay)
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}
func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta int32) {
s.updateRouteSubscriptionMap(acc, sub, delta)
if s.gateway.enabled {