Merge pull request #1092 from nats-io/fix_duplicate_conn_close

[FIXED] Connection could be closed twice
This commit is contained in:
Ivan Kozlovic
2019-08-13 20:36:22 -06:00
committed by GitHub
7 changed files with 64 additions and 16 deletions

View File

@@ -22,7 +22,6 @@ import (
"math/rand"
"net"
"regexp"
"runtime"
"strings"
"sync"
"sync/atomic"
@@ -911,13 +910,6 @@ func (c *client) handlePartialWrite(pnb net.Buffers) {
// Lock must be held
func (c *client) flushOutbound() bool {
if c.flags.isSet(flushOutbound) {
// Another go-routine has set this and is either
// doing the write or waiting to re-acquire the
// lock post write. Release lock to give it a
// chance to complete.
c.mu.Unlock()
runtime.Gosched()
c.mu.Lock()
return false
}
c.flags.set(flushOutbound)

View File

@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.0.3"
VERSION = "2.0.4"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -953,10 +953,23 @@ func (c *client) processGatewayInfo(info *Info) {
// Register as an outbound gateway.. if we had a protocol to ack our connect,
// then we should do that when process that ack.
s.registerOutboundGatewayConnection(gwName, c)
c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID)
// Now that the outbound gateway is registered, we can remove from temp map.
s.removeFromTempClients(cid)
if s.registerOutboundGatewayConnection(gwName, c) {
c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID)
// Now that the outbound gateway is registered, we can remove from temp map.
s.removeFromTempClients(cid)
} else {
// There was a bug that would cause a connection to possibly
// be called twice resulting in reconnection of twice the
// same outbound connection. The issue is fixed, but adding
// defensive code above that if we did not register this connection
// because we already have an outbound for this name, then
// close this connection (and make sure it does not try to reconnect)
c.mu.Lock()
c.flags.set(noReconnect)
c.mu.Unlock()
c.closeConnection(WrongGateway)
return
}
} else if info.GatewayCmd > 0 {
switch info.GatewayCmd {
case gatewayCmdAllSubsStart:
@@ -1476,12 +1489,17 @@ func (s *Server) registerInboundGatewayConnection(cid uint64, gwc *client) {
// Register the given gateway connection (*client) in the outbound gateways
// map with the given name as the key.
func (s *Server) registerOutboundGatewayConnection(name string, gwc *client) {
func (s *Server) registerOutboundGatewayConnection(name string, gwc *client) bool {
s.gateway.Lock()
if _, exist := s.gateway.out[name]; exist {
s.gateway.Unlock()
return false
}
s.gateway.out[name] = gwc
s.gateway.outo = append(s.gateway.outo, gwc)
s.gateway.orderOutboundConnectionsLocked()
s.gateway.Unlock()
return true
}
// Returns the outbound gateway connection (*client) with the given name,

View File

@@ -4909,3 +4909,32 @@ func TestGatewayLogAccountInterestModeSwitch(t *testing.T) {
t.Fatalf("Attempted to switch while it was already in interest mode only")
}
}
func TestGatewaySingleOutbound(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Error on listen: %v", err)
}
defer l.Close()
port := l.Addr().(*net.TCPAddr).Port
o1 := testGatewayOptionsFromToWithTLS(t, "A", "B", []string{fmt.Sprintf("nats://127.0.0.1:%d", port)})
o1.Gateway.TLSTimeout = 0.1
s1 := runGatewayServer(o1)
defer s1.Shutdown()
buf := make([]byte, 10000)
// Check for a little bit that we don't have the situation
timeout := time.Now().Add(2 * time.Second)
for time.Now().Before(timeout) {
n := runtime.Stack(buf, true)
index := strings.Index(string(buf[:n]), "reconnectGateway")
if index != -1 {
newIndex := strings.LastIndex(string(buf[:n]), "reconnectGateway")
if newIndex > index {
t.Fatalf("Trying to reconnect twice for the same outbound!")
}
}
time.Sleep(15 * time.Millisecond)
}
}

View File

@@ -979,6 +979,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
<a href=/varz>varz</a><br/>
<a href=/connz>connz</a><br/>
<a href=/routez>routez</a><br/>
<a href=/gatewayz>gatewayz</a><br/>
<a href=/subsz>subsz</a><br/>
<br/>
<a href=https://nats-io.github.io/docs/nats_server/monitoring.html>help</a>

View File

@@ -1321,7 +1321,7 @@ func TestConfigureOptions(t *testing.T) {
// that Visit() stops when an error is found).
expectToFail([]string{"-cluster", ":", "-routes", ""}, "protocol")
expectToFail([]string{"-cluster", "nats://127.0.0.1", "-routes", ""}, "port")
expectToFail([]string{"-cluster", "nats://127.0.0.1:xxx", "-routes", ""}, "integer")
expectToFail([]string{"-cluster", "nats://127.0.0.1:xxx", "-routes", ""}, "invalid port")
expectToFail([]string{"-cluster", "nats://ivan:127.0.0.1:6222", "-routes", ""}, "colons")
expectToFail([]string{"-cluster", "nats://ivan@127.0.0.1:6222", "-routes", ""}, "password")

View File

@@ -1039,7 +1039,15 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra
// the lock, which could cause pingTimer to think that this
// connection is stale otherwise.
c.last = time.Now()
c.flushOutbound()
if !c.flushOutbound() {
// Another go-routine has set this and is either
// doing the write or waiting to re-acquire the
// lock post write. Release lock to give it a
// chance to complete.
c.mu.Unlock()
runtime.Gosched()
c.mu.Lock()
}
if closed = c.flags.isSet(clearConnection); closed {
break
}