From 85b3f8a7fd7459feda1d14cd8aeedc38a811bea1 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 4 Mar 2022 19:55:07 -0700 Subject: [PATCH] Gateways: data race when setting first ping timer This was introduced when fixing #2881. The call to setFirstPingTimer needed to be done under the client's lock. Moved setFirstPingTimer from a server receiver to a client receiver. The only reason it was a server receiver is because we need the server options, but c.srv is always set when invoking this function, so we will get the server from c.srv in that function now. Related to #2881 Signed-off-by: Ivan Kozlovic --- server/client.go | 31 ++++++++++++++++++++++++++++++- server/gateway.go | 16 ++++++---------- server/leafnode.go | 4 ++-- server/route.go | 2 +- server/server.go | 26 -------------------------- 5 files changed, 39 insertions(+), 40 deletions(-) diff --git a/server/client.go b/server/client.go index ce2c0ee0..5d372cd4 100644 --- a/server/client.go +++ b/server/client.go @@ -1728,7 +1728,7 @@ func (c *client) processConnect(arg []byte) error { c.rtt = computeRTT(c.start) if c.srv != nil { c.clearPingTimer() - c.srv.setFirstPingTimer(c) + c.setFirstPingTimer() } } kind := c.kind @@ -5318,3 +5318,32 @@ func (c *client) Warnf(format string, v ...interface{}) { format = fmt.Sprintf("%s - %s", c, format) c.srv.Warnf(format, v...) } + +// Set the very first PING to a lower interval to capture the initial RTT. +// After that the PING interval will be set to the user defined value. +// Client lock should be held. +func (c *client) setFirstPingTimer() { + s := c.srv + if s == nil { + return + } + opts := s.getOpts() + d := opts.PingInterval + + if !opts.DisableShortFirstPing { + if c.kind != CLIENT { + if d > firstPingInterval { + d = firstPingInterval + } + if c.kind == GATEWAY { + d = adjustPingIntervalForGateway(d) + } + } else if d > firstClientPingInterval { + d = firstClientPingInterval + } + } + // We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s. + addDelay := rand.Int63n(int64(d / 5)) + d += time.Duration(addDelay) + c.ping.tmr = time.AfterFunc(d, c.processPingTimer) +} diff --git a/server/gateway.go b/server/gateway.go index 591fcc02..ce5b7e6d 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -928,17 +928,11 @@ func (c *client) processGatewayConnect(arg []byte) error { return ErrWrongGateway } - // For a gateway connection, c.gw is guaranteed not to be nil here - // (created in createGateway() and never set to nil). - // For inbound connections, it is important to know in the parser - // if the CONNECT was received first, so we use this boolean (as - // opposed to client.flags that require locking) to indicate that - // CONNECT was processed. Again, this boolean is set/read in the - // readLoop without locking. + c.mu.Lock() c.gw.connected = true - // Set the Ping timer after sending connect and info. - s.setFirstPingTimer(c) + c.setFirstPingTimer() + c.mu.Unlock() return nil } @@ -1052,7 +1046,9 @@ func (c *client) processGatewayInfo(info *Info) { // Now that the outbound gateway is registered, we can remove from temp map. s.removeFromTempClients(cid) // Set the Ping timer after sending connect and info. - s.setFirstPingTimer(c) + c.mu.Lock() + c.setFirstPingTimer() + c.mu.Unlock() } else { // There was a bug that would cause a connection to possibly // be called twice resulting in reconnection of twice the diff --git a/server/leafnode.go b/server/leafnode.go index e93eb600..a129733c 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1402,7 +1402,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro } // Set the Ping timer - s.setFirstPingTimer(c) + c.setFirstPingTimer() // If we received pub deny permissions from the other end, merge with existing ones. c.mergeDenyPermissions(pub, proto.DenyPub) @@ -2526,7 +2526,7 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) { c.mu.Lock() closed := c.isClosed() if !closed { - s.setFirstPingTimer(c) + c.setFirstPingTimer() } c.mu.Unlock() if closed { diff --git a/server/route.go b/server/route.go index 0857b9a2..583c9881 100644 --- a/server/route.go +++ b/server/route.go @@ -1346,7 +1346,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { } // Set the Ping timer - s.setFirstPingTimer(c) + c.setFirstPingTimer() // For routes, the "client" is added to s.routes only when processing // the INFO protocol, that is much later. diff --git a/server/server.go b/server/server.go index a3fb0c65..3be3f662 100644 --- a/server/server.go +++ b/server/server.go @@ -3619,32 +3619,6 @@ func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool { return false } -// Invoked for route, leaf and gateway connections. Set the very first -// PING to a lower interval to capture the initial RTT. -// After that the PING interval will be set to the user defined value. -// Client lock should be held. -func (s *Server) setFirstPingTimer(c *client) { - opts := s.getOpts() - d := opts.PingInterval - - if !opts.DisableShortFirstPing { - if c.kind != CLIENT { - if d > firstPingInterval { - d = firstPingInterval - } - if c.kind == GATEWAY { - d = adjustPingIntervalForGateway(d) - } - } else if d > firstClientPingInterval { - d = firstClientPingInterval - } - } - // We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s. - addDelay := rand.Int63n(int64(d / 5)) - d += time.Duration(addDelay) - c.ping.tmr = time.AfterFunc(d, c.processPingTimer) -} - func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta int32) { s.updateRouteSubscriptionMap(acc, sub, delta) if s.gateway.enabled {