mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 18:20:42 -07:00
Merge pull request #3573 from nats-io/route_impl_reconnect
[FIXED] Solicited route may not retry to reconnect
This commit is contained in:
@@ -4937,7 +4937,10 @@ func (c *client) reconnect() {
|
||||
return
|
||||
}
|
||||
if c.route != nil {
|
||||
retryImplicit = c.route.retry
|
||||
// A route is marked as solicited if it was given an URL to connect to,
|
||||
// which would be the case even with implicit (due to gossip), so mark this
|
||||
// as a retry for a route that is solicited and not explicit.
|
||||
retryImplicit = c.route.retry || (c.route.didSolicit && c.route.routeType == Implicit)
|
||||
}
|
||||
kind := c.kind
|
||||
if kind == GATEWAY {
|
||||
|
||||
@@ -1593,3 +1593,80 @@ func TestClusterQueueGroupWeightTrackingLeak(t *testing.T) {
|
||||
checkSubGone(s)
|
||||
checkSubGone(s2)
|
||||
}
|
||||
|
||||
type testRouteReconnectLogger struct {
|
||||
DummyLogger
|
||||
ch chan string
|
||||
}
|
||||
|
||||
func (l *testRouteReconnectLogger) Debugf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
if strings.Contains(msg, "Trying to connect to route") {
|
||||
select {
|
||||
case l.ch <- msg:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteSolicitedReconnectsEvenIfImplicit(t *testing.T) {
|
||||
o1 := DefaultOptions()
|
||||
o1.ServerName = "A"
|
||||
s1 := RunServer(o1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
o2 := DefaultOptions()
|
||||
o2.ServerName = "B"
|
||||
o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port))
|
||||
// Not strictly required to reconnect, but if the reconnect were to fail for any reason
|
||||
// then the server would retry only once and then stops. So set it to some higher value
|
||||
// and then we will check that the server does not try more than that.
|
||||
o2.Cluster.ConnectRetries = 3
|
||||
s2 := RunServer(o2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
o3 := DefaultOptions()
|
||||
o3.ServerName = "C"
|
||||
o3.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port))
|
||||
o3.Cluster.ConnectRetries = 3
|
||||
s3 := RunServer(o3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
checkClusterFormed(t, s1, s2, s3)
|
||||
|
||||
s2.mu.Lock()
|
||||
for _, r := range s2.routes {
|
||||
r.mu.Lock()
|
||||
// Close the route between S2 and S3 (that do not have explicit route to each other)
|
||||
if r.route.remoteID == s3.ID() {
|
||||
r.nc.Close()
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
s2.mu.Unlock()
|
||||
// Wait a bit to make sure that we don't check for cluster formed too soon (need to make
|
||||
// sure that connection is really removed and reconnect mechanism starts).
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
checkClusterFormed(t, s1, s2, s3)
|
||||
|
||||
// Now shutdown server 3 and make sure that s2 stops trying to reconnect to s3 at one point
|
||||
l := &testRouteReconnectLogger{ch: make(chan string, 10)}
|
||||
s2.SetLogger(l, true, false)
|
||||
s3.Shutdown()
|
||||
// S2 should retry ConnectRetries+1 times and then stop
|
||||
for i := 0; i < o2.Cluster.ConnectRetries+1; i++ {
|
||||
select {
|
||||
case <-l.ch:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Did not attempt to reconnect")
|
||||
}
|
||||
}
|
||||
// Now it should have stopped (in tests, reconnect delay is down to 15ms, so we don't need
|
||||
// to wait for too long).
|
||||
select {
|
||||
case msg := <-l.ch:
|
||||
t.Fatalf("Unexpected attempt to reconnect: %s", msg)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// OK
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user