mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #1785 from nats-io/fix_gw_implicit_reconnect
Fixed GW implicit reconnection
This commit is contained in:
@@ -679,7 +679,7 @@ func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) {
|
|||||||
s.gateway.Lock()
|
s.gateway.Lock()
|
||||||
// We could have just accepted an inbound for this remote gateway.
|
// We could have just accepted an inbound for this remote gateway.
|
||||||
// So if there is an inbound, let's try again to connect.
|
// So if there is an inbound, let's try again to connect.
|
||||||
if len(s.gateway.in) > 0 {
|
if s.gateway.hasInbound(cfg.Name) {
|
||||||
s.gateway.Unlock()
|
s.gateway.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -697,6 +697,20 @@ func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns true if there is an inbound for the given `name`.
|
||||||
|
// Lock held on entry.
|
||||||
|
func (g *srvGateway) hasInbound(name string) bool {
|
||||||
|
for _, ig := range g.in {
|
||||||
|
ig.mu.Lock()
|
||||||
|
igname := ig.gw.name
|
||||||
|
ig.mu.Unlock()
|
||||||
|
if igname == name {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Called when a gateway connection is either accepted or solicited.
|
// Called when a gateway connection is either accepted or solicited.
|
||||||
// If accepted, the gateway is marked as inbound.
|
// If accepted, the gateway is marked as inbound.
|
||||||
// If solicited, the gateway is marked as outbound.
|
// If solicited, the gateway is marked as outbound.
|
||||||
|
|||||||
@@ -1290,6 +1290,81 @@ func TestGatewayImplicitReconnectRace(t *testing.T) {
|
|||||||
waitForInboundGateways(t, sa2, 1, 2*time.Second)
|
waitForInboundGateways(t, sa2, 1, 2*time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type gwReconnAttemptLogger struct {
|
||||||
|
DummyLogger
|
||||||
|
errCh chan string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *gwReconnAttemptLogger) Errorf(format string, v ...interface{}) {
|
||||||
|
msg := fmt.Sprintf(format, v...)
|
||||||
|
if strings.Contains(msg, `Error connecting to implicit gateway "A"`) {
|
||||||
|
select {
|
||||||
|
case l.errCh <- msg:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGatewayImplicitReconnectHonorConnectRetries(t *testing.T) {
|
||||||
|
ob := testDefaultOptionsForGateway("B")
|
||||||
|
ob.ReconnectErrorReports = 1
|
||||||
|
ob.Gateway.ConnectRetries = 2
|
||||||
|
sb := runGatewayServer(ob)
|
||||||
|
defer sb.Shutdown()
|
||||||
|
|
||||||
|
l := &gwReconnAttemptLogger{errCh: make(chan string, 3)}
|
||||||
|
sb.SetLogger(l, true, false)
|
||||||
|
|
||||||
|
oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
|
||||||
|
sa := runGatewayServer(oa)
|
||||||
|
defer sa.Shutdown()
|
||||||
|
|
||||||
|
// Wait for the proper connections
|
||||||
|
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||||
|
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||||
|
waitForInboundGateways(t, sa, 1, time.Second)
|
||||||
|
waitForInboundGateways(t, sb, 1, time.Second)
|
||||||
|
|
||||||
|
// Now have C connect to B.
|
||||||
|
oc := testGatewayOptionsFromToWithServers(t, "C", "B", sb)
|
||||||
|
sc := runGatewayServer(oc)
|
||||||
|
defer sc.Shutdown()
|
||||||
|
|
||||||
|
// Wait for the proper connections
|
||||||
|
waitForOutboundGateways(t, sa, 2, time.Second)
|
||||||
|
waitForOutboundGateways(t, sb, 2, time.Second)
|
||||||
|
waitForOutboundGateways(t, sc, 2, time.Second)
|
||||||
|
waitForInboundGateways(t, sa, 2, time.Second)
|
||||||
|
waitForInboundGateways(t, sb, 2, time.Second)
|
||||||
|
waitForInboundGateways(t, sc, 2, time.Second)
|
||||||
|
|
||||||
|
// Shutdown sa now...
|
||||||
|
sa.Shutdown()
|
||||||
|
|
||||||
|
// B will try to reconnect to A 3 times (we stop after attempts > ConnectRetries)
|
||||||
|
timeout := time.NewTimer(time.Second)
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
select {
|
||||||
|
case <-l.errCh:
|
||||||
|
// OK
|
||||||
|
case <-timeout.C:
|
||||||
|
t.Fatal("Did not get debug trace about reconnect")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If we get 1 more, we have an issue!
|
||||||
|
select {
|
||||||
|
case e := <-l.errCh:
|
||||||
|
t.Fatalf("Should not have attempted to reconnect: %q", e)
|
||||||
|
case <-time.After(250 * time.Millisecond):
|
||||||
|
// OK!
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForOutboundGateways(t, sb, 1, 2*time.Second)
|
||||||
|
waitForInboundGateways(t, sb, 1, 2*time.Second)
|
||||||
|
waitForOutboundGateways(t, sc, 1, 2*time.Second)
|
||||||
|
waitForInboundGateways(t, sc, 1, 2*time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
func TestGatewayURLsFromClusterSentInINFO(t *testing.T) {
|
func TestGatewayURLsFromClusterSentInINFO(t *testing.T) {
|
||||||
o2 := testDefaultOptionsForGateway("B")
|
o2 := testDefaultOptionsForGateway("B")
|
||||||
s2 := runGatewayServer(o2)
|
s2 := runGatewayServer(o2)
|
||||||
|
|||||||
Reference in New Issue
Block a user