From c20afd401603d9bbda9793560a1921f74276263b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 13 Aug 2019 19:25:22 -0600 Subject: [PATCH] [FIXED] Connection could be closed twice This was introduced in PR#930. The first commit had the route's check if the flushOutbound() returned false, and if so would locally unlock/lock the connection's lock. Unfortunately, this was replaced in the second commit (https://github.com/nats-io/nats-server/commit/a6aeed3a6b359de2be5b1d6edd8b33d48b61610b) to the flushOutbound() function itself. This causes the function closeConnection() to possibly unlock the connection while calling flushOutbound(), which if the connection is closed due to both a tls timeout for instance and explicitly, it would result in the connection being scheduled for a reconnect (if explicit gateway connection, possibly route). Added defensive code in Gateway to register a unique outbound gateway. Fixed a test that was now failing with newer Go version in which they fixed url.Parse() Signed-off-by: Ivan Kozlovic --- server/client.go | 8 -------- server/const.go | 2 +- server/gateway.go | 28 +++++++++++++++++++++++----- server/gateway_test.go | 29 +++++++++++++++++++++++++++++ server/monitor.go | 1 + server/opts_test.go | 2 +- server/route.go | 10 +++++++++- 7 files changed, 64 insertions(+), 16 deletions(-) diff --git a/server/client.go b/server/client.go index b6de00db..992162ab 100644 --- a/server/client.go +++ b/server/client.go @@ -22,7 +22,6 @@ import ( "math/rand" "net" "regexp" - "runtime" "strings" "sync" "sync/atomic" @@ -911,13 +910,6 @@ func (c *client) handlePartialWrite(pnb net.Buffers) { // Lock must be held func (c *client) flushOutbound() bool { if c.flags.isSet(flushOutbound) { - // Another go-routine has set this and is either - // doing the write or waiting to re-acquire the - // lock post write. Release lock to give it a - // chance to complete. - c.mu.Unlock() - runtime.Gosched() - c.mu.Lock() return false } c.flags.set(flushOutbound) diff --git a/server/const.go b/server/const.go index 69bdaca8..20f6899a 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.0.3" + VERSION = "2.0.4" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/gateway.go b/server/gateway.go index 8cc4775b..9a816393 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -953,10 +953,23 @@ func (c *client) processGatewayInfo(info *Info) { // Register as an outbound gateway.. if we had a protocol to ack our connect, // then we should do that when process that ack. - s.registerOutboundGatewayConnection(gwName, c) - c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID) - // Now that the outbound gateway is registered, we can remove from temp map. - s.removeFromTempClients(cid) + if s.registerOutboundGatewayConnection(gwName, c) { + c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID) + // Now that the outbound gateway is registered, we can remove from temp map. + s.removeFromTempClients(cid) + } else { + // There was a bug that would cause a connection to possibly + // be called twice resulting in reconnection of twice the + // same outbound connection. The issue is fixed, but adding + // defensive code above that if we did not register this connection + // because we already have an outbound for this name, then + // close this connection (and make sure it does not try to reconnect) + c.mu.Lock() + c.flags.set(noReconnect) + c.mu.Unlock() + c.closeConnection(WrongGateway) + return + } } else if info.GatewayCmd > 0 { switch info.GatewayCmd { case gatewayCmdAllSubsStart: @@ -1476,12 +1489,17 @@ func (s *Server) registerInboundGatewayConnection(cid uint64, gwc *client) { // Register the given gateway connection (*client) in the outbound gateways // map with the given name as the key. -func (s *Server) registerOutboundGatewayConnection(name string, gwc *client) { +func (s *Server) registerOutboundGatewayConnection(name string, gwc *client) bool { s.gateway.Lock() + if _, exist := s.gateway.out[name]; exist { + s.gateway.Unlock() + return false + } s.gateway.out[name] = gwc s.gateway.outo = append(s.gateway.outo, gwc) s.gateway.orderOutboundConnectionsLocked() s.gateway.Unlock() + return true } // Returns the outbound gateway connection (*client) with the given name, diff --git a/server/gateway_test.go b/server/gateway_test.go index 9982f0ed..8c1dbcdc 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -4909,3 +4909,32 @@ func TestGatewayLogAccountInterestModeSwitch(t *testing.T) { t.Fatalf("Attempted to switch while it was already in interest mode only") } } + +func TestGatewaySingleOutbound(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Error on listen: %v", err) + } + defer l.Close() + port := l.Addr().(*net.TCPAddr).Port + + o1 := testGatewayOptionsFromToWithTLS(t, "A", "B", []string{fmt.Sprintf("nats://127.0.0.1:%d", port)}) + o1.Gateway.TLSTimeout = 0.1 + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + buf := make([]byte, 10000) + // Check for a little bit that we don't have the situation + timeout := time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + n := runtime.Stack(buf, true) + index := strings.Index(string(buf[:n]), "reconnectGateway") + if index != -1 { + newIndex := strings.LastIndex(string(buf[:n]), "reconnectGateway") + if newIndex > index { + t.Fatalf("Trying to reconnect twice for the same outbound!") + } + } + time.Sleep(15 * time.Millisecond) + } +} diff --git a/server/monitor.go b/server/monitor.go index 8e1b8ce4..63848116 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -979,6 +979,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { varz
connz
routez
+ gatewayz
subsz

help diff --git a/server/opts_test.go b/server/opts_test.go index fc7e6653..5be34dce 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -1321,7 +1321,7 @@ func TestConfigureOptions(t *testing.T) { // that Visit() stops when an error is found). expectToFail([]string{"-cluster", ":", "-routes", ""}, "protocol") expectToFail([]string{"-cluster", "nats://127.0.0.1", "-routes", ""}, "port") - expectToFail([]string{"-cluster", "nats://127.0.0.1:xxx", "-routes", ""}, "integer") + expectToFail([]string{"-cluster", "nats://127.0.0.1:xxx", "-routes", ""}, "invalid port") expectToFail([]string{"-cluster", "nats://ivan:127.0.0.1:6222", "-routes", ""}, "colons") expectToFail([]string{"-cluster", "nats://ivan@127.0.0.1:6222", "-routes", ""}, "password") diff --git a/server/route.go b/server/route.go index 77f65906..be79d0e9 100644 --- a/server/route.go +++ b/server/route.go @@ -1039,7 +1039,15 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra // the lock, which could cause pingTimer to think that this // connection is stale otherwise. c.last = time.Now() - c.flushOutbound() + if !c.flushOutbound() { + // Another go-routine has set this and is either + // doing the write or waiting to re-acquire the + // lock post write. Release lock to give it a + // chance to complete. + c.mu.Unlock() + runtime.Gosched() + c.mu.Lock() + } if closed = c.flags.isSet(clearConnection); closed { break }