diff --git a/server/gateway.go b/server/gateway.go index 1629998c..e41de23d 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -679,7 +679,7 @@ func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) { s.gateway.Lock() // We could have just accepted an inbound for this remote gateway. // So if there is an inbound, let's try again to connect. - if len(s.gateway.in) > 0 { + if s.gateway.hasInbound(cfg.Name) { s.gateway.Unlock() continue } @@ -697,6 +697,20 @@ func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) { } } +// Returns true if there is an inbound for the given `name`. +// Lock held on entry. +func (g *srvGateway) hasInbound(name string) bool { + for _, ig := range g.in { + ig.mu.Lock() + igname := ig.gw.name + ig.mu.Unlock() + if igname == name { + return true + } + } + return false +} + // Called when a gateway connection is either accepted or solicited. // If accepted, the gateway is marked as inbound. // If solicited, the gateway is marked as outbound. diff --git a/server/gateway_test.go b/server/gateway_test.go index fd0eeeb8..c634df28 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -1290,6 +1290,81 @@ func TestGatewayImplicitReconnectRace(t *testing.T) { waitForInboundGateways(t, sa2, 1, 2*time.Second) } +type gwReconnAttemptLogger struct { + DummyLogger + errCh chan string +} + +func (l *gwReconnAttemptLogger) Errorf(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + if strings.Contains(msg, `Error connecting to implicit gateway "A"`) { + select { + case l.errCh <- msg: + default: + } + } +} + +func TestGatewayImplicitReconnectHonorConnectRetries(t *testing.T) { + ob := testDefaultOptionsForGateway("B") + ob.ReconnectErrorReports = 1 + ob.Gateway.ConnectRetries = 2 + sb := runGatewayServer(ob) + defer sb.Shutdown() + + l := &gwReconnAttemptLogger{errCh: make(chan string, 3)} + sb.SetLogger(l, true, false) + + oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb) + sa := runGatewayServer(oa) + defer sa.Shutdown() + + // Wait for the proper connections + waitForOutboundGateways(t, sa, 1, time.Second) + waitForOutboundGateways(t, sb, 1, time.Second) + waitForInboundGateways(t, sa, 1, time.Second) + waitForInboundGateways(t, sb, 1, time.Second) + + // Now have C connect to B. + oc := testGatewayOptionsFromToWithServers(t, "C", "B", sb) + sc := runGatewayServer(oc) + defer sc.Shutdown() + + // Wait for the proper connections + waitForOutboundGateways(t, sa, 2, time.Second) + waitForOutboundGateways(t, sb, 2, time.Second) + waitForOutboundGateways(t, sc, 2, time.Second) + waitForInboundGateways(t, sa, 2, time.Second) + waitForInboundGateways(t, sb, 2, time.Second) + waitForInboundGateways(t, sc, 2, time.Second) + + // Shutdown sa now... + sa.Shutdown() + + // B will try to reconnect to A 3 times (we stop after attempts > ConnectRetries) + timeout := time.NewTimer(time.Second) + for i := 0; i < 3; i++ { + select { + case <-l.errCh: + // OK + case <-timeout.C: + t.Fatal("Did not get debug trace about reconnect") + } + } + // If we get 1 more, we have an issue! + select { + case e := <-l.errCh: + t.Fatalf("Should not have attempted to reconnect: %q", e) + case <-time.After(250 * time.Millisecond): + // OK! + } + + waitForOutboundGateways(t, sb, 1, 2*time.Second) + waitForInboundGateways(t, sb, 1, 2*time.Second) + waitForOutboundGateways(t, sc, 1, 2*time.Second) + waitForInboundGateways(t, sc, 1, 2*time.Second) +} + func TestGatewayURLsFromClusterSentInINFO(t *testing.T) { o2 := testDefaultOptionsForGateway("B") s2 := runGatewayServer(o2)