diff --git a/server/client.go b/server/client.go index 45db8f86..e6beae89 100644 --- a/server/client.go +++ b/server/client.go @@ -4937,7 +4937,10 @@ func (c *client) reconnect() { return } if c.route != nil { - retryImplicit = c.route.retry + // A route is marked as solicited if it was given an URL to connect to, + // which would be the case even with implicit (due to gossip), so mark this + // as a retry for a route that is solicited and not explicit. + retryImplicit = c.route.retry || (c.route.didSolicit && c.route.routeType == Implicit) } kind := c.kind if kind == GATEWAY { diff --git a/server/routes_test.go b/server/routes_test.go index c5465b5b..ed3405ed 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1593,3 +1593,80 @@ func TestClusterQueueGroupWeightTrackingLeak(t *testing.T) { checkSubGone(s) checkSubGone(s2) } + +type testRouteReconnectLogger struct { + DummyLogger + ch chan string +} + +func (l *testRouteReconnectLogger) Debugf(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + if strings.Contains(msg, "Trying to connect to route") { + select { + case l.ch <- msg: + default: + } + } +} + +func TestRouteSolicitedReconnectsEvenIfImplicit(t *testing.T) { + o1 := DefaultOptions() + o1.ServerName = "A" + s1 := RunServer(o1) + defer s1.Shutdown() + + o2 := DefaultOptions() + o2.ServerName = "B" + o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port)) + // Not strictly required to reconnect, but if the reconnect were to fail for any reason + // then the server would retry only once and then stops. So set it to some higher value + // and then we will check that the server does not try more than that. + o2.Cluster.ConnectRetries = 3 + s2 := RunServer(o2) + defer s2.Shutdown() + + o3 := DefaultOptions() + o3.ServerName = "C" + o3.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port)) + o3.Cluster.ConnectRetries = 3 + s3 := RunServer(o3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + s2.mu.Lock() + for _, r := range s2.routes { + r.mu.Lock() + // Close the route between S2 and S3 (that do not have explicit route to each other) + if r.route.remoteID == s3.ID() { + r.nc.Close() + } + r.mu.Unlock() + } + s2.mu.Unlock() + // Wait a bit to make sure that we don't check for cluster formed too soon (need to make + // sure that connection is really removed and reconnect mechanism starts). + time.Sleep(100 * time.Millisecond) + checkClusterFormed(t, s1, s2, s3) + + // Now shutdown server 3 and make sure that s2 stops trying to reconnect to s3 at one point + l := &testRouteReconnectLogger{ch: make(chan string, 10)} + s2.SetLogger(l, true, false) + s3.Shutdown() + // S2 should retry ConnectRetries+1 times and then stop + for i := 0; i < o2.Cluster.ConnectRetries+1; i++ { + select { + case <-l.ch: + case <-time.After(2 * time.Second): + t.Fatal("Did not attempt to reconnect") + } + } + // Now it should have stopped (in tests, reconnect delay is down to 15ms, so we don't need + // to wait for too long). + select { + case msg := <-l.ch: + t.Fatalf("Unexpected attempt to reconnect: %s", msg) + case <-time.After(50 * time.Millisecond): + // OK + } +}