mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
[FIXED] Race condition during implicit Gateway reconnection
Say server in cluster A accepts a connection from a server in cluster B. The gateway is implicit, in that A does not have a configured remote gateway to B. Then the server in B is shutdown, which A detects and initiate a single reconnect attempt (since it is implicit and if the reconnect retries is not set). While this happens, a new server in B is restarted and connects to A. If that happens before the initial reconnect attempt failed, A will register that new inbound and do not attempt to solicit because it has already a remote entry for gateway B. At this point when the reconnect to old server B fails, then the remote GW entry is removed, and A will not create an outbound connection to the new B server. We fix that by checking if there is a registered inbound when we get to the point of removing the remote on a failed implicit reconnect. If there is one, we try the reconnection. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -457,6 +457,7 @@ func (s *Server) gatewayAcceptLoop(ch chan struct{}) {
|
||||
authRequired := opts.Gateway.Username != ""
|
||||
info := &Info{
|
||||
ID: s.info.ID,
|
||||
Name: opts.ServerName,
|
||||
Version: s.info.Version,
|
||||
AuthRequired: authRequired,
|
||||
TLSRequired: tlsReq,
|
||||
@@ -686,6 +687,12 @@ func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) {
|
||||
if isImplicit {
|
||||
if opts.Gateway.ConnectRetries == 0 || attempts > opts.Gateway.ConnectRetries {
|
||||
s.gateway.Lock()
|
||||
// We could have just accepted an inbound for this remote gateway.
|
||||
// So if there is an inbound, let's try again to connect.
|
||||
if len(s.gateway.in) > 0 {
|
||||
s.gateway.Unlock()
|
||||
continue
|
||||
}
|
||||
delete(s.gateway.remotes, cfg.Name)
|
||||
s.gateway.Unlock()
|
||||
return
|
||||
@@ -1135,6 +1142,7 @@ func (s *Server) forwardNewGatewayToLocalCluster(oinfo *Info) {
|
||||
// the sent protocol will not have host/port defined.
|
||||
info := &Info{
|
||||
ID: "GW" + s.info.ID,
|
||||
Name: s.getOpts().ServerName,
|
||||
Gateway: oinfo.Gateway,
|
||||
GatewayURLs: oinfo.GatewayURLs,
|
||||
GatewayCmd: gatewayCmdGossip,
|
||||
|
||||
@@ -674,17 +674,28 @@ func TestGatewaySolicitDelayWithImplicitOutbounds(t *testing.T) {
|
||||
waitForInboundGateways(t, s1, 1, time.Second)
|
||||
}
|
||||
|
||||
type slowResolver struct{}
|
||||
type slowResolver struct {
|
||||
inLookupCh chan struct{}
|
||||
releaseCh chan struct{}
|
||||
}
|
||||
|
||||
func (r *slowResolver) LookupHost(ctx context.Context, h string) ([]string, error) {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
if r.inLookupCh != nil {
|
||||
select {
|
||||
case r.inLookupCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
<-r.releaseCh
|
||||
} else {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
return []string{h}, nil
|
||||
}
|
||||
|
||||
func TestGatewaySolicitShutdown(t *testing.T) {
|
||||
var urls []string
|
||||
for i := 0; i < 5; i++ {
|
||||
u := fmt.Sprintf("nats://127.0.0.1:%d", 1234+i)
|
||||
u := fmt.Sprintf("nats://localhost:%d", 1234+i)
|
||||
urls = append(urls, u)
|
||||
}
|
||||
o1 := testGatewayOptionsFromToWithURLs(t, "A", "B", urls)
|
||||
@@ -1234,6 +1245,55 @@ func TestGatewayImplicitReconnect(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayImplicitReconnectRace(t *testing.T) {
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
resolver := &slowResolver{
|
||||
inLookupCh: make(chan struct{}, 1),
|
||||
releaseCh: make(chan struct{}),
|
||||
}
|
||||
ob.Gateway.resolver = resolver
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
oa1 := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||
sa1 := runGatewayServer(oa1)
|
||||
defer sa1.Shutdown()
|
||||
|
||||
// Wait for the proper connections
|
||||
waitForOutboundGateways(t, sa1, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||
waitForInboundGateways(t, sa1, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
// On sb, change the URL to sa1 so that it is a name, instead of an IP,
|
||||
// so that we hit the slow resolver.
|
||||
cfg := sb.getRemoteGateway("A")
|
||||
cfg.updateURLs([]string{fmt.Sprintf("localhost:%d", sa1.GatewayAddr().Port)})
|
||||
|
||||
// Shutdown sa1 now...
|
||||
sa1.Shutdown()
|
||||
|
||||
// Wait to be notified that B has detected the connection close
|
||||
// and it is trying to resolve the host during the reconnect.
|
||||
<-resolver.inLookupCh
|
||||
|
||||
// Start a new "A" server (sa2).
|
||||
oa2 := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||
sa2 := runGatewayServer(oa2)
|
||||
defer sa2.Shutdown()
|
||||
|
||||
// Make sure we have our outbound to sb registered on sa2 and inbound
|
||||
// from sa2 on sb before releasing the resolver.
|
||||
waitForOutboundGateways(t, sa2, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sb, 1, 2*time.Second)
|
||||
|
||||
// Now release the resolver and ensure we have all connections.
|
||||
close(resolver.releaseCh)
|
||||
|
||||
waitForOutboundGateways(t, sb, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sa2, 1, 2*time.Second)
|
||||
}
|
||||
|
||||
func TestGatewayURLsFromClusterSentInINFO(t *testing.T) {
|
||||
o2 := testDefaultOptionsForGateway("B")
|
||||
s2 := runGatewayServer(o2)
|
||||
|
||||
@@ -740,6 +740,7 @@ func createClusterEx(t *testing.T, doAccounts bool, gwSolicit time.Duration, wai
|
||||
// All of these need system accounts.
|
||||
o.Accounts, o.Users = createAccountsAndUsers()
|
||||
o.SystemAccount = "$SYS"
|
||||
o.ServerName = fmt.Sprintf("%s1", clusterName)
|
||||
// Run the server
|
||||
s := RunServer(o)
|
||||
bindGlobal(s)
|
||||
@@ -761,6 +762,7 @@ func createClusterEx(t *testing.T, doAccounts bool, gwSolicit time.Duration, wai
|
||||
// All of these need system accounts.
|
||||
o.Accounts, o.Users = createAccountsAndUsers()
|
||||
o.SystemAccount = "$SYS"
|
||||
o.ServerName = fmt.Sprintf("%s%d", clusterName, i+1)
|
||||
s := RunServer(o)
|
||||
bindGlobal(s)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user