From 08d6aaa78fc74e78f9067eddefedca507174800e Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 23 Feb 2022 15:19:20 -0700 Subject: [PATCH] [FIXED] Gateway: connect could fail due to PING sent before CONNECT When a gateway connection was created (either accepted or initiated) the timer to fire the first PING was started at that time, which means that for an outbound connection, if the INFO coming from the other side was delayed, it was possible for the outbound to send the PING protocol before the CONNECT, which would cause the accepting side to close the connection due to a "parse" error (since the CONNECT for an inbound is supposed to be the very first protocol). Also noticed that we were not setting the auth timer like we do for the other type of connections. If authorization{timeout:} is not set, the default is 2 seconds. Signed-off-by: Ivan Kozlovic --- server/gateway.go | 10 ++++-- test/gateway_test.go | 73 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/server/gateway.go b/server/gateway.go index dbdd7153..591fcc02 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -787,6 +787,8 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { c.Noticef("Processing inbound gateway connection") // Check if TLS is required for inbound GW connections. tlsRequired = opts.Gateway.TLSConfig != nil + // We expect a CONNECT from the accepted connection. + c.setAuthTimer(secondsToDuration(opts.Gateway.AuthTimeout)) } // Check for TLS @@ -854,9 +856,6 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) { c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite)) } - // Set the Ping timer after sending connect and info. - s.setFirstPingTimer(c) - c.mu.Unlock() // Announce ourselves again to new connections. @@ -938,6 +937,9 @@ func (c *client) processGatewayConnect(arg []byte) error { // readLoop without locking. c.gw.connected = true + // Set the Ping timer after sending connect and info. + s.setFirstPingTimer(c) + return nil } @@ -1049,6 +1051,8 @@ func (c *client) processGatewayInfo(info *Info) { c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID) // Now that the outbound gateway is registered, we can remove from temp map. s.removeFromTempClients(cid) + // Set the Ping timer after sending connect and info. + s.setFirstPingTimer(c) } else { // There was a bug that would cause a connection to possibly // be called twice resulting in reconnection of twice the diff --git a/test/gateway_test.go b/test/gateway_test.go index 263c2abb..74a1e61a 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -733,3 +733,76 @@ func TestGatewayAdvertiseInCluster(t *testing.T) { sb2.Shutdown() expectNothing(t, gA) } + +func TestGatewayAuthTimeout(t *testing.T) { + for _, test := range []struct { + name string + setAuth bool // + wait time.Duration + }{ + {"auth not explicitly set", false, 2500 * time.Millisecond}, + {"auth set", true, 500 * time.Millisecond}, + } { + t.Run(test.name, func(t *testing.T) { + ob := testDefaultOptionsForGateway("B") + if test.setAuth { + ob.Gateway.AuthTimeout = 0.25 + } + sb := RunServer(ob) + defer sb.Shutdown() + + sa := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port) + defer sa.Close() + + gAExpect := expectCommand(t, sa) + + dstInfo := checkInfoMsg(t, sa) + if dstInfo.Gateway != "B" { + t.Fatalf("Expected to connect to %q, got %q", "B", dstInfo.Gateway) + } + + // Don't send our CONNECT and we should be disconnected due to auth timeout. + time.Sleep(test.wait) + gAExpect(errRe) + expectDisconnect(t, sa) + }) + } +} + +func TestGatewayFirstPingGoesAfterConnect(t *testing.T) { + ob := testDefaultOptionsForGateway("B") + // For this test, we want the first ping to NOT be disabled. + ob.DisableShortFirstPing = false + // Also, for this test increase auth_timeout so that it does not disconnect + // while checking... + ob.Gateway.AuthTimeout = 10.0 + sb := RunServer(ob) + defer sb.Shutdown() + + sa := createGatewayConn(t, ob.Gateway.Host, ob.Gateway.Port) + defer sa.Close() + + gASend, gAExpect := sendCommand(t, sa), expectCommand(t, sa) + dstInfo := checkInfoMsg(t, sa) + if dstInfo.Gateway != "B" { + t.Fatalf("Expected to connect to %q, got %q", "B", dstInfo.Gateway) + } + + // Wait and we should not be receiving a PING from server B until we send + // a CONNECT. We need to wait for more than the initial PING, so cannot + // use expectNothing() helper here. + buf := make([]byte, 256) + sa.SetReadDeadline(time.Now().Add(2 * time.Second)) + if n, err := sa.Read(buf); err == nil { + t.Fatalf("Expected nothing, got %s", buf[:n]) + } + + // Now send connect and INFO + cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"tls_required\":%v,\"gateway\":%q}\r\n", + false, false, false, "A") + gASend(cs) + gASend(fmt.Sprintf("INFO {\"gateway\":%q}\r\n", "A")) + + // We should get the first PING + gAExpect(pingRe) +}