mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Gateways: data race when setting first ping timer
This was introduced when fixing #2881. The call to setFirstPingTimer needed to be done under the client's lock. Moved setFirstPingTimer from a server receiver to a client receiver. The only reason it was a server receiver is because we need the server options, but c.srv is always set when invoking this function, so we will get the server from c.srv in that function now. Related to #2881 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1728,7 +1728,7 @@ func (c *client) processConnect(arg []byte) error {
|
||||
c.rtt = computeRTT(c.start)
|
||||
if c.srv != nil {
|
||||
c.clearPingTimer()
|
||||
c.srv.setFirstPingTimer(c)
|
||||
c.setFirstPingTimer()
|
||||
}
|
||||
}
|
||||
kind := c.kind
|
||||
@@ -5318,3 +5318,32 @@ func (c *client) Warnf(format string, v ...interface{}) {
|
||||
format = fmt.Sprintf("%s - %s", c, format)
|
||||
c.srv.Warnf(format, v...)
|
||||
}
|
||||
|
||||
// Set the very first PING to a lower interval to capture the initial RTT.
|
||||
// After that the PING interval will be set to the user defined value.
|
||||
// Client lock should be held.
|
||||
func (c *client) setFirstPingTimer() {
|
||||
s := c.srv
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
opts := s.getOpts()
|
||||
d := opts.PingInterval
|
||||
|
||||
if !opts.DisableShortFirstPing {
|
||||
if c.kind != CLIENT {
|
||||
if d > firstPingInterval {
|
||||
d = firstPingInterval
|
||||
}
|
||||
if c.kind == GATEWAY {
|
||||
d = adjustPingIntervalForGateway(d)
|
||||
}
|
||||
} else if d > firstClientPingInterval {
|
||||
d = firstClientPingInterval
|
||||
}
|
||||
}
|
||||
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
|
||||
addDelay := rand.Int63n(int64(d / 5))
|
||||
d += time.Duration(addDelay)
|
||||
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
|
||||
}
|
||||
|
||||
@@ -928,17 +928,11 @@ func (c *client) processGatewayConnect(arg []byte) error {
|
||||
return ErrWrongGateway
|
||||
}
|
||||
|
||||
// For a gateway connection, c.gw is guaranteed not to be nil here
|
||||
// (created in createGateway() and never set to nil).
|
||||
// For inbound connections, it is important to know in the parser
|
||||
// if the CONNECT was received first, so we use this boolean (as
|
||||
// opposed to client.flags that require locking) to indicate that
|
||||
// CONNECT was processed. Again, this boolean is set/read in the
|
||||
// readLoop without locking.
|
||||
c.mu.Lock()
|
||||
c.gw.connected = true
|
||||
|
||||
// Set the Ping timer after sending connect and info.
|
||||
s.setFirstPingTimer(c)
|
||||
c.setFirstPingTimer()
|
||||
c.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1052,7 +1046,9 @@ func (c *client) processGatewayInfo(info *Info) {
|
||||
// Now that the outbound gateway is registered, we can remove from temp map.
|
||||
s.removeFromTempClients(cid)
|
||||
// Set the Ping timer after sending connect and info.
|
||||
s.setFirstPingTimer(c)
|
||||
c.mu.Lock()
|
||||
c.setFirstPingTimer()
|
||||
c.mu.Unlock()
|
||||
} else {
|
||||
// There was a bug that would cause a connection to possibly
|
||||
// be called twice resulting in reconnection of twice the
|
||||
|
||||
@@ -1402,7 +1402,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
}
|
||||
|
||||
// Set the Ping timer
|
||||
s.setFirstPingTimer(c)
|
||||
c.setFirstPingTimer()
|
||||
|
||||
// If we received pub deny permissions from the other end, merge with existing ones.
|
||||
c.mergeDenyPermissions(pub, proto.DenyPub)
|
||||
@@ -2526,7 +2526,7 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) {
|
||||
c.mu.Lock()
|
||||
closed := c.isClosed()
|
||||
if !closed {
|
||||
s.setFirstPingTimer(c)
|
||||
c.setFirstPingTimer()
|
||||
}
|
||||
c.mu.Unlock()
|
||||
if closed {
|
||||
|
||||
@@ -1346,7 +1346,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
}
|
||||
|
||||
// Set the Ping timer
|
||||
s.setFirstPingTimer(c)
|
||||
c.setFirstPingTimer()
|
||||
|
||||
// For routes, the "client" is added to s.routes only when processing
|
||||
// the INFO protocol, that is much later.
|
||||
|
||||
@@ -3619,32 +3619,6 @@ func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Invoked for route, leaf and gateway connections. Set the very first
|
||||
// PING to a lower interval to capture the initial RTT.
|
||||
// After that the PING interval will be set to the user defined value.
|
||||
// Client lock should be held.
|
||||
func (s *Server) setFirstPingTimer(c *client) {
|
||||
opts := s.getOpts()
|
||||
d := opts.PingInterval
|
||||
|
||||
if !opts.DisableShortFirstPing {
|
||||
if c.kind != CLIENT {
|
||||
if d > firstPingInterval {
|
||||
d = firstPingInterval
|
||||
}
|
||||
if c.kind == GATEWAY {
|
||||
d = adjustPingIntervalForGateway(d)
|
||||
}
|
||||
} else if d > firstClientPingInterval {
|
||||
d = firstClientPingInterval
|
||||
}
|
||||
}
|
||||
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
|
||||
addDelay := rand.Int63n(int64(d / 5))
|
||||
d += time.Duration(addDelay)
|
||||
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
|
||||
}
|
||||
|
||||
func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta int32) {
|
||||
s.updateRouteSubscriptionMap(acc, sub, delta)
|
||||
if s.gateway.enabled {
|
||||
|
||||
Reference in New Issue
Block a user