diff --git a/server/client.go b/server/client.go index b6de00db..992162ab 100644 --- a/server/client.go +++ b/server/client.go @@ -22,7 +22,6 @@ import ( "math/rand" "net" "regexp" - "runtime" "strings" "sync" "sync/atomic" @@ -911,13 +910,6 @@ func (c *client) handlePartialWrite(pnb net.Buffers) { // Lock must be held func (c *client) flushOutbound() bool { if c.flags.isSet(flushOutbound) { - // Another go-routine has set this and is either - // doing the write or waiting to re-acquire the - // lock post write. Release lock to give it a - // chance to complete. - c.mu.Unlock() - runtime.Gosched() - c.mu.Lock() return false } c.flags.set(flushOutbound) diff --git a/server/const.go b/server/const.go index 69bdaca8..20f6899a 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.0.3" + VERSION = "2.0.4" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/gateway.go b/server/gateway.go index 8cc4775b..9a816393 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -953,10 +953,23 @@ func (c *client) processGatewayInfo(info *Info) { // Register as an outbound gateway.. if we had a protocol to ack our connect, // then we should do that when process that ack. - s.registerOutboundGatewayConnection(gwName, c) - c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID) - // Now that the outbound gateway is registered, we can remove from temp map. - s.removeFromTempClients(cid) + if s.registerOutboundGatewayConnection(gwName, c) { + c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID) + // Now that the outbound gateway is registered, we can remove from temp map. + s.removeFromTempClients(cid) + } else { + // There was a bug that would cause a connection to possibly + // be called twice resulting in reconnection of twice the + // same outbound connection. The issue is fixed, but adding + // defensive code above that if we did not register this connection + // because we already have an outbound for this name, then + // close this connection (and make sure it does not try to reconnect) + c.mu.Lock() + c.flags.set(noReconnect) + c.mu.Unlock() + c.closeConnection(WrongGateway) + return + } } else if info.GatewayCmd > 0 { switch info.GatewayCmd { case gatewayCmdAllSubsStart: @@ -1476,12 +1489,17 @@ func (s *Server) registerInboundGatewayConnection(cid uint64, gwc *client) { // Register the given gateway connection (*client) in the outbound gateways // map with the given name as the key. -func (s *Server) registerOutboundGatewayConnection(name string, gwc *client) { +func (s *Server) registerOutboundGatewayConnection(name string, gwc *client) bool { s.gateway.Lock() + if _, exist := s.gateway.out[name]; exist { + s.gateway.Unlock() + return false + } s.gateway.out[name] = gwc s.gateway.outo = append(s.gateway.outo, gwc) s.gateway.orderOutboundConnectionsLocked() s.gateway.Unlock() + return true } // Returns the outbound gateway connection (*client) with the given name, diff --git a/server/gateway_test.go b/server/gateway_test.go index 9982f0ed..8c1dbcdc 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -4909,3 +4909,32 @@ func TestGatewayLogAccountInterestModeSwitch(t *testing.T) { t.Fatalf("Attempted to switch while it was already in interest mode only") } } + +func TestGatewaySingleOutbound(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Error on listen: %v", err) + } + defer l.Close() + port := l.Addr().(*net.TCPAddr).Port + + o1 := testGatewayOptionsFromToWithTLS(t, "A", "B", []string{fmt.Sprintf("nats://127.0.0.1:%d", port)}) + o1.Gateway.TLSTimeout = 0.1 + s1 := runGatewayServer(o1) + defer s1.Shutdown() + + buf := make([]byte, 10000) + // Check for a little bit that we don't have the situation + timeout := time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + n := runtime.Stack(buf, true) + index := strings.Index(string(buf[:n]), "reconnectGateway") + if index != -1 { + newIndex := strings.LastIndex(string(buf[:n]), "reconnectGateway") + if newIndex > index { + t.Fatalf("Trying to reconnect twice for the same outbound!") + } + } + time.Sleep(15 * time.Millisecond) + } +} diff --git a/server/monitor.go b/server/monitor.go index 8e1b8ce4..63848116 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -979,6 +979,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { varz
connz
routez
+ gatewayz
subsz

help diff --git a/server/opts_test.go b/server/opts_test.go index fc7e6653..5be34dce 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -1321,7 +1321,7 @@ func TestConfigureOptions(t *testing.T) { // that Visit() stops when an error is found). expectToFail([]string{"-cluster", ":", "-routes", ""}, "protocol") expectToFail([]string{"-cluster", "nats://127.0.0.1", "-routes", ""}, "port") - expectToFail([]string{"-cluster", "nats://127.0.0.1:xxx", "-routes", ""}, "integer") + expectToFail([]string{"-cluster", "nats://127.0.0.1:xxx", "-routes", ""}, "invalid port") expectToFail([]string{"-cluster", "nats://ivan:127.0.0.1:6222", "-routes", ""}, "colons") expectToFail([]string{"-cluster", "nats://ivan@127.0.0.1:6222", "-routes", ""}, "password") diff --git a/server/route.go b/server/route.go index 77f65906..be79d0e9 100644 --- a/server/route.go +++ b/server/route.go @@ -1039,7 +1039,15 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra // the lock, which could cause pingTimer to think that this // connection is stale otherwise. c.last = time.Now() - c.flushOutbound() + if !c.flushOutbound() { + // Another go-routine has set this and is either + // doing the write or waiting to re-acquire the + // lock post write. Release lock to give it a + // chance to complete. + c.mu.Unlock() + runtime.Gosched() + c.mu.Lock() + } if closed = c.flags.isSet(clearConnection); closed { break }