mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fixed GW implicit reconnection
PR #1412 had a fix for races during implicit GW reconnection. However, the fix was a bit too simplistic in that it was checking only if there was any inbound gateway to decide to try to reconnect an implicit disconnected GW. We need to check the name, not only presence of inbound GW connections. Related to #1412 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -679,7 +679,7 @@ func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) {
|
||||
s.gateway.Lock()
|
||||
// We could have just accepted an inbound for this remote gateway.
|
||||
// So if there is an inbound, let's try again to connect.
|
||||
if len(s.gateway.in) > 0 {
|
||||
if s.gateway.hasInbound(cfg.Name) {
|
||||
s.gateway.Unlock()
|
||||
continue
|
||||
}
|
||||
@@ -697,6 +697,20 @@ func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if there is an inbound for the given `name`.
|
||||
// Lock held on entry.
|
||||
func (g *srvGateway) hasInbound(name string) bool {
|
||||
for _, ig := range g.in {
|
||||
ig.mu.Lock()
|
||||
igname := ig.gw.name
|
||||
ig.mu.Unlock()
|
||||
if igname == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Called when a gateway connection is either accepted or solicited.
|
||||
// If accepted, the gateway is marked as inbound.
|
||||
// If solicited, the gateway is marked as outbound.
|
||||
|
||||
@@ -1290,6 +1290,81 @@ func TestGatewayImplicitReconnectRace(t *testing.T) {
|
||||
waitForInboundGateways(t, sa2, 1, 2*time.Second)
|
||||
}
|
||||
|
||||
type gwReconnAttemptLogger struct {
|
||||
DummyLogger
|
||||
errCh chan string
|
||||
}
|
||||
|
||||
func (l *gwReconnAttemptLogger) Errorf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
if strings.Contains(msg, `Error connecting to implicit gateway "A"`) {
|
||||
select {
|
||||
case l.errCh <- msg:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayImplicitReconnectHonorConnectRetries(t *testing.T) {
|
||||
ob := testDefaultOptionsForGateway("B")
|
||||
ob.ReconnectErrorReports = 1
|
||||
ob.Gateway.ConnectRetries = 2
|
||||
sb := runGatewayServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
l := &gwReconnAttemptLogger{errCh: make(chan string, 3)}
|
||||
sb.SetLogger(l, true, false)
|
||||
|
||||
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
// Wait for the proper connections
|
||||
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||
waitForInboundGateways(t, sa, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
// Now have C connect to B.
|
||||
oc := testGatewayOptionsFromToWithServers(t, "C", "B", sb)
|
||||
sc := runGatewayServer(oc)
|
||||
defer sc.Shutdown()
|
||||
|
||||
// Wait for the proper connections
|
||||
waitForOutboundGateways(t, sa, 2, time.Second)
|
||||
waitForOutboundGateways(t, sb, 2, time.Second)
|
||||
waitForOutboundGateways(t, sc, 2, time.Second)
|
||||
waitForInboundGateways(t, sa, 2, time.Second)
|
||||
waitForInboundGateways(t, sb, 2, time.Second)
|
||||
waitForInboundGateways(t, sc, 2, time.Second)
|
||||
|
||||
// Shutdown sa now...
|
||||
sa.Shutdown()
|
||||
|
||||
// B will try to reconnect to A 3 times (we stop after attempts > ConnectRetries)
|
||||
timeout := time.NewTimer(time.Second)
|
||||
for i := 0; i < 3; i++ {
|
||||
select {
|
||||
case <-l.errCh:
|
||||
// OK
|
||||
case <-timeout.C:
|
||||
t.Fatal("Did not get debug trace about reconnect")
|
||||
}
|
||||
}
|
||||
// If we get 1 more, we have an issue!
|
||||
select {
|
||||
case e := <-l.errCh:
|
||||
t.Fatalf("Should not have attempted to reconnect: %q", e)
|
||||
case <-time.After(250 * time.Millisecond):
|
||||
// OK!
|
||||
}
|
||||
|
||||
waitForOutboundGateways(t, sb, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sb, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sc, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sc, 1, 2*time.Second)
|
||||
}
|
||||
|
||||
func TestGatewayURLsFromClusterSentInINFO(t *testing.T) {
|
||||
o2 := testDefaultOptionsForGateway("B")
|
||||
s2 := runGatewayServer(o2)
|
||||
|
||||
Reference in New Issue
Block a user