From d24e9b75b3c092d30e64e74b400e5d07147c9635 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 28 Dec 2020 12:28:55 -0700 Subject: [PATCH] Fixed GW implicit reconnection PR #1412 had a fix for races during implicit GW reconnection. However, the fix was a bit too simplistic in that it was checking only if there was any inbound gateway to decide to try to reconnect an implicit disconnected GW. We need to check the name, not only presence of inbound GW connections. Related to #1412 Signed-off-by: Ivan Kozlovic --- server/gateway.go | 16 ++++++++- server/gateway_test.go | 75 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/server/gateway.go b/server/gateway.go index 1629998c..e41de23d 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -679,7 +679,7 @@ func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) { s.gateway.Lock() // We could have just accepted an inbound for this remote gateway. // So if there is an inbound, let's try again to connect. - if len(s.gateway.in) > 0 { + if s.gateway.hasInbound(cfg.Name) { s.gateway.Unlock() continue } @@ -697,6 +697,20 @@ func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) { } } +// Returns true if there is an inbound for the given `name`. +// Lock held on entry. +func (g *srvGateway) hasInbound(name string) bool { + for _, ig := range g.in { + ig.mu.Lock() + igname := ig.gw.name + ig.mu.Unlock() + if igname == name { + return true + } + } + return false +} + // Called when a gateway connection is either accepted or solicited. // If accepted, the gateway is marked as inbound. // If solicited, the gateway is marked as outbound. diff --git a/server/gateway_test.go b/server/gateway_test.go index fd0eeeb8..c634df28 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -1290,6 +1290,81 @@ func TestGatewayImplicitReconnectRace(t *testing.T) { waitForInboundGateways(t, sa2, 1, 2*time.Second) } +type gwReconnAttemptLogger struct { + DummyLogger + errCh chan string +} + +func (l *gwReconnAttemptLogger) Errorf(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + if strings.Contains(msg, `Error connecting to implicit gateway "A"`) { + select { + case l.errCh <- msg: + default: + } + } +} + +func TestGatewayImplicitReconnectHonorConnectRetries(t *testing.T) { + ob := testDefaultOptionsForGateway("B") + ob.ReconnectErrorReports = 1 + ob.Gateway.ConnectRetries = 2 + sb := runGatewayServer(ob) + defer sb.Shutdown() + + l := &gwReconnAttemptLogger{errCh: make(chan string, 3)} + sb.SetLogger(l, true, false) + + oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb) + sa := runGatewayServer(oa) + defer sa.Shutdown() + + // Wait for the proper connections + waitForOutboundGateways(t, sa, 1, time.Second) + waitForOutboundGateways(t, sb, 1, time.Second) + waitForInboundGateways(t, sa, 1, time.Second) + waitForInboundGateways(t, sb, 1, time.Second) + + // Now have C connect to B. + oc := testGatewayOptionsFromToWithServers(t, "C", "B", sb) + sc := runGatewayServer(oc) + defer sc.Shutdown() + + // Wait for the proper connections + waitForOutboundGateways(t, sa, 2, time.Second) + waitForOutboundGateways(t, sb, 2, time.Second) + waitForOutboundGateways(t, sc, 2, time.Second) + waitForInboundGateways(t, sa, 2, time.Second) + waitForInboundGateways(t, sb, 2, time.Second) + waitForInboundGateways(t, sc, 2, time.Second) + + // Shutdown sa now... + sa.Shutdown() + + // B will try to reconnect to A 3 times (we stop after attempts > ConnectRetries) + timeout := time.NewTimer(time.Second) + for i := 0; i < 3; i++ { + select { + case <-l.errCh: + // OK + case <-timeout.C: + t.Fatal("Did not get debug trace about reconnect") + } + } + // If we get 1 more, we have an issue! + select { + case e := <-l.errCh: + t.Fatalf("Should not have attempted to reconnect: %q", e) + case <-time.After(250 * time.Millisecond): + // OK! + } + + waitForOutboundGateways(t, sb, 1, 2*time.Second) + waitForInboundGateways(t, sb, 1, 2*time.Second) + waitForOutboundGateways(t, sc, 1, 2*time.Second) + waitForInboundGateways(t, sc, 1, 2*time.Second) +} + func TestGatewayURLsFromClusterSentInINFO(t *testing.T) { o2 := testDefaultOptionsForGateway("B") s2 := runGatewayServer(o2)