mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
[FIXED] Connection could be closed twice
This was introduced in PR#930. The first commit had the route's
check if the flushOutbound() returned false, and if so would
locally unlock/lock the connection's lock. Unfortunately, this
was replaced in the second commit (a6aeed3a6b)
to the flushOutbound() function itself.
This causes the function closeConnection() to possibly unlock
the connection while calling flushOutbound(), which if the
connection is closed due to both a tls timeout for instance
and explicitly, it would result in the connection being scheduled
for a reconnect (if explicit gateway connection, possibly route).
Added defensive code in Gateway to register a unique outbound gateway.
Fixed a test that was now failing with newer Go version in which
they fixed url.Parse()
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -22,7 +22,6 @@ import (
|
||||
"math/rand"
|
||||
"net"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -911,13 +910,6 @@ func (c *client) handlePartialWrite(pnb net.Buffers) {
|
||||
// Lock must be held
|
||||
func (c *client) flushOutbound() bool {
|
||||
if c.flags.isSet(flushOutbound) {
|
||||
// Another go-routine has set this and is either
|
||||
// doing the write or waiting to re-acquire the
|
||||
// lock post write. Release lock to give it a
|
||||
// chance to complete.
|
||||
c.mu.Unlock()
|
||||
runtime.Gosched()
|
||||
c.mu.Lock()
|
||||
return false
|
||||
}
|
||||
c.flags.set(flushOutbound)
|
||||
|
||||
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.0.3"
|
||||
VERSION = "2.0.4"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -953,10 +953,23 @@ func (c *client) processGatewayInfo(info *Info) {
|
||||
|
||||
// Register as an outbound gateway.. if we had a protocol to ack our connect,
|
||||
// then we should do that when process that ack.
|
||||
s.registerOutboundGatewayConnection(gwName, c)
|
||||
c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID)
|
||||
// Now that the outbound gateway is registered, we can remove from temp map.
|
||||
s.removeFromTempClients(cid)
|
||||
if s.registerOutboundGatewayConnection(gwName, c) {
|
||||
c.Noticef("Outbound gateway connection to %q (%s) registered", gwName, info.ID)
|
||||
// Now that the outbound gateway is registered, we can remove from temp map.
|
||||
s.removeFromTempClients(cid)
|
||||
} else {
|
||||
// There was a bug that would cause a connection to possibly
|
||||
// be called twice resulting in reconnection of twice the
|
||||
// same outbound connection. The issue is fixed, but adding
|
||||
// defensive code above that if we did not register this connection
|
||||
// because we already have an outbound for this name, then
|
||||
// close this connection (and make sure it does not try to reconnect)
|
||||
c.mu.Lock()
|
||||
c.flags.set(noReconnect)
|
||||
c.mu.Unlock()
|
||||
c.closeConnection(WrongGateway)
|
||||
return
|
||||
}
|
||||
} else if info.GatewayCmd > 0 {
|
||||
switch info.GatewayCmd {
|
||||
case gatewayCmdAllSubsStart:
|
||||
@@ -1476,12 +1489,17 @@ func (s *Server) registerInboundGatewayConnection(cid uint64, gwc *client) {
|
||||
|
||||
// Register the given gateway connection (*client) in the outbound gateways
|
||||
// map with the given name as the key.
|
||||
func (s *Server) registerOutboundGatewayConnection(name string, gwc *client) {
|
||||
func (s *Server) registerOutboundGatewayConnection(name string, gwc *client) bool {
|
||||
s.gateway.Lock()
|
||||
if _, exist := s.gateway.out[name]; exist {
|
||||
s.gateway.Unlock()
|
||||
return false
|
||||
}
|
||||
s.gateway.out[name] = gwc
|
||||
s.gateway.outo = append(s.gateway.outo, gwc)
|
||||
s.gateway.orderOutboundConnectionsLocked()
|
||||
s.gateway.Unlock()
|
||||
return true
|
||||
}
|
||||
|
||||
// Returns the outbound gateway connection (*client) with the given name,
|
||||
|
||||
@@ -4909,3 +4909,32 @@ func TestGatewayLogAccountInterestModeSwitch(t *testing.T) {
|
||||
t.Fatalf("Attempted to switch while it was already in interest mode only")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewaySingleOutbound(t *testing.T) {
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Error on listen: %v", err)
|
||||
}
|
||||
defer l.Close()
|
||||
port := l.Addr().(*net.TCPAddr).Port
|
||||
|
||||
o1 := testGatewayOptionsFromToWithTLS(t, "A", "B", []string{fmt.Sprintf("nats://127.0.0.1:%d", port)})
|
||||
o1.Gateway.TLSTimeout = 0.1
|
||||
s1 := runGatewayServer(o1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
buf := make([]byte, 10000)
|
||||
// Check for a little bit that we don't have the situation
|
||||
timeout := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(timeout) {
|
||||
n := runtime.Stack(buf, true)
|
||||
index := strings.Index(string(buf[:n]), "reconnectGateway")
|
||||
if index != -1 {
|
||||
newIndex := strings.LastIndex(string(buf[:n]), "reconnectGateway")
|
||||
if newIndex > index {
|
||||
t.Fatalf("Trying to reconnect twice for the same outbound!")
|
||||
}
|
||||
}
|
||||
time.Sleep(15 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -979,6 +979,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
|
||||
<a href=/varz>varz</a><br/>
|
||||
<a href=/connz>connz</a><br/>
|
||||
<a href=/routez>routez</a><br/>
|
||||
<a href=/gatewayz>gatewayz</a><br/>
|
||||
<a href=/subsz>subsz</a><br/>
|
||||
<br/>
|
||||
<a href=https://nats-io.github.io/docs/nats_server/monitoring.html>help</a>
|
||||
|
||||
@@ -1321,7 +1321,7 @@ func TestConfigureOptions(t *testing.T) {
|
||||
// that Visit() stops when an error is found).
|
||||
expectToFail([]string{"-cluster", ":", "-routes", ""}, "protocol")
|
||||
expectToFail([]string{"-cluster", "nats://127.0.0.1", "-routes", ""}, "port")
|
||||
expectToFail([]string{"-cluster", "nats://127.0.0.1:xxx", "-routes", ""}, "integer")
|
||||
expectToFail([]string{"-cluster", "nats://127.0.0.1:xxx", "-routes", ""}, "invalid port")
|
||||
expectToFail([]string{"-cluster", "nats://ivan:127.0.0.1:6222", "-routes", ""}, "colons")
|
||||
expectToFail([]string{"-cluster", "nats://ivan@127.0.0.1:6222", "-routes", ""}, "password")
|
||||
|
||||
|
||||
@@ -1039,7 +1039,15 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra
|
||||
// the lock, which could cause pingTimer to think that this
|
||||
// connection is stale otherwise.
|
||||
c.last = time.Now()
|
||||
c.flushOutbound()
|
||||
if !c.flushOutbound() {
|
||||
// Another go-routine has set this and is either
|
||||
// doing the write or waiting to re-acquire the
|
||||
// lock post write. Release lock to give it a
|
||||
// chance to complete.
|
||||
c.mu.Unlock()
|
||||
runtime.Gosched()
|
||||
c.mu.Lock()
|
||||
}
|
||||
if closed = c.flags.isSet(clearConnection); closed {
|
||||
break
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user