Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-06-19 16:42:39 -07:00
parent 5598d5c711
commit 37352edff0
3 changed files with 117 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
# Cluster Server A
listen: localhost:-1
cluster {
listen: 127.0.0.1:7244
}

View File

@@ -1389,6 +1389,96 @@ func TestConfigReloadClusterRoutes(t *testing.T) {
}
}
// Ensure Reload supports removing a solicited route. In this case from A->B
// Test this by starting two servers in a cluster, ensuring messages flow between them.
// Then stop server B, and have server A continue to try to connect. Reload A with a config
// that removes the route and make sure it does not connect to server B when its restarted.
func TestConfigReloadClusterRemoveSolicitedRoutes(t *testing.T) {
srvb, srvbOpts, srvbConfig := runServerWithSymlinkConfig(t, "tmp_b.conf", "./configs/reload/srv_b_1.conf")
defer os.Remove(srvbConfig)
defer srvb.Shutdown()
srva, srvaOpts, srvaConfig := runServerWithSymlinkConfig(t, "tmp_a.conf", "./configs/reload/srv_a_1.conf")
defer os.Remove(srvaConfig)
defer srva.Shutdown()
checkClusterFormed(t, srva, srvb)
srvaAddr := fmt.Sprintf("nats://%s:%d", srvaOpts.Host, srvaOpts.Port)
srvaConn, err := nats.Connect(srvaAddr)
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer srvaConn.Close()
sub, err := srvaConn.SubscribeSync("foo")
if err != nil {
t.Fatalf("Error subscribing: %v", err)
}
defer sub.Unsubscribe()
if err := srvaConn.Flush(); err != nil {
t.Fatalf("Error flushing: %v", err)
}
srvbAddr := fmt.Sprintf("nats://%s:%d", srvbOpts.Host, srvbOpts.Port)
srvbConn, err := nats.Connect(srvbAddr)
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer srvbConn.Close()
if err := srvbConn.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error publishing: %v", err)
}
srvbConn.Flush()
msg, err := sub.NextMsg(5 * time.Second)
if err != nil {
t.Fatalf("Error receiving message: %v", err)
}
if string(msg.Data) != "hello" {
t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("hello"), msg.Data)
}
// Now stop server B.
srvb.Shutdown()
// Wait til route is dropped.
numRoutes := 0
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if numRoutes = srva.NumRoutes(); numRoutes == 0 {
break
} else {
time.Sleep(100 * time.Millisecond)
}
}
if numRoutes != 0 {
t.Fatalf("Expected 0 routes for server A, got %d", numRoutes)
}
// Now change config for server A to not solicit a route to server B.
createSymlink(t, srvaConfig, "./configs/reload/srv_a_4.conf")
defer os.Remove(srvaConfig)
if err := srva.Reload(); err != nil {
t.Fatalf("Error reloading config: %v", err)
}
// Restart server B.
go srvb.Start()
// We should not have a cluster formed here.
deadline = time.Now().Add(2 * DEFAULT_ROUTE_RECONNECT)
for time.Now().Before(deadline) {
if numRoutes = srva.NumRoutes(); numRoutes != 0 {
break
} else {
time.Sleep(100 * time.Millisecond)
}
}
if numRoutes != 0 {
t.Fatalf("Expected 0 routes for server A, got %d", numRoutes)
}
}
func reloadUpdateConfig(t *testing.T, s *Server, conf, content string) {
if err := ioutil.WriteFile(conf, []byte(content), 0666); err != nil {
stackFatalf(t, "Error creating config file: %v", err)

View File

@@ -973,13 +973,27 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) {
s.connectToRoute(rURL, tryForEver)
}
// Checks to make sure the route is still valid.
func (s *Server) routeStillValid(rURL *url.URL) bool {
for _, ri := range s.getOpts().Routes {
if ri == rURL {
return true
}
}
return false
}
func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
// Snapshot server options.
opts := s.getOpts()
defer s.grWG.Done()
attempts := 0
for s.isRunning() && rURL != nil {
if tryForEver && !s.routeStillValid(rURL) {
return
}
s.Debugf("Trying to connect to route on %s", rURL.Host)
conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL)
if err != nil {
@@ -1000,6 +1014,12 @@ func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
continue
}
}
if tryForEver && !s.routeStillValid(rURL) {
conn.Close()
return
}
// We have a route connection here.
// Go ahead and create it and exit this func.
s.createRoute(conn, rURL)