diff --git a/server/configs/reload/srv_a_4.conf b/server/configs/reload/srv_a_4.conf new file mode 100644 index 00000000..4b63643a --- /dev/null +++ b/server/configs/reload/srv_a_4.conf @@ -0,0 +1,7 @@ +# Cluster Server A + +listen: localhost:-1 + +cluster { + listen: 127.0.0.1:7244 +} diff --git a/server/reload_test.go b/server/reload_test.go index 44692d7b..a7dada35 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -1389,6 +1389,96 @@ func TestConfigReloadClusterRoutes(t *testing.T) { } } +// Ensure Reload supports removing a solicited route. In this case from A->B +// Test this by starting two servers in a cluster, ensuring messages flow between them. +// Then stop server B, and have server A continue to try to connect. Reload A with a config +// that removes the route and make sure it does not connect to server B when its restarted. +func TestConfigReloadClusterRemoveSolicitedRoutes(t *testing.T) { + srvb, srvbOpts, srvbConfig := runServerWithSymlinkConfig(t, "tmp_b.conf", "./configs/reload/srv_b_1.conf") + defer os.Remove(srvbConfig) + defer srvb.Shutdown() + + srva, srvaOpts, srvaConfig := runServerWithSymlinkConfig(t, "tmp_a.conf", "./configs/reload/srv_a_1.conf") + defer os.Remove(srvaConfig) + defer srva.Shutdown() + + checkClusterFormed(t, srva, srvb) + + srvaAddr := fmt.Sprintf("nats://%s:%d", srvaOpts.Host, srvaOpts.Port) + srvaConn, err := nats.Connect(srvaAddr) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer srvaConn.Close() + sub, err := srvaConn.SubscribeSync("foo") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + defer sub.Unsubscribe() + if err := srvaConn.Flush(); err != nil { + t.Fatalf("Error flushing: %v", err) + } + + srvbAddr := fmt.Sprintf("nats://%s:%d", srvbOpts.Host, srvbOpts.Port) + srvbConn, err := nats.Connect(srvbAddr) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer srvbConn.Close() + + if err := srvbConn.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error publishing: %v", err) + } + srvbConn.Flush() + msg, err := sub.NextMsg(5 * time.Second) + if err != nil { + t.Fatalf("Error receiving message: %v", err) + } + if string(msg.Data) != "hello" { + t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("hello"), msg.Data) + } + + // Now stop server B. + srvb.Shutdown() + + // Wait til route is dropped. + numRoutes := 0 + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if numRoutes = srva.NumRoutes(); numRoutes == 0 { + break + } else { + time.Sleep(100 * time.Millisecond) + } + } + if numRoutes != 0 { + t.Fatalf("Expected 0 routes for server A, got %d", numRoutes) + } + + // Now change config for server A to not solicit a route to server B. + createSymlink(t, srvaConfig, "./configs/reload/srv_a_4.conf") + defer os.Remove(srvaConfig) + if err := srva.Reload(); err != nil { + t.Fatalf("Error reloading config: %v", err) + } + + // Restart server B. + go srvb.Start() + + // We should not have a cluster formed here. + deadline = time.Now().Add(2 * DEFAULT_ROUTE_RECONNECT) + for time.Now().Before(deadline) { + if numRoutes = srva.NumRoutes(); numRoutes != 0 { + break + } else { + time.Sleep(100 * time.Millisecond) + } + } + if numRoutes != 0 { + t.Fatalf("Expected 0 routes for server A, got %d", numRoutes) + } +} + func reloadUpdateConfig(t *testing.T, s *Server, conf, content string) { if err := ioutil.WriteFile(conf, []byte(content), 0666); err != nil { stackFatalf(t, "Error creating config file: %v", err) diff --git a/server/route.go b/server/route.go index 3fec8388..710d3047 100644 --- a/server/route.go +++ b/server/route.go @@ -973,13 +973,27 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) { s.connectToRoute(rURL, tryForEver) } +// Checks to make sure the route is still valid. +func (s *Server) routeStillValid(rURL *url.URL) bool { + for _, ri := range s.getOpts().Routes { + if ri == rURL { + return true + } + } + return false +} + func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) { // Snapshot server options. opts := s.getOpts() defer s.grWG.Done() + attempts := 0 for s.isRunning() && rURL != nil { + if tryForEver && !s.routeStillValid(rURL) { + return + } s.Debugf("Trying to connect to route on %s", rURL.Host) conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL) if err != nil { @@ -1000,6 +1014,12 @@ func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) { continue } } + + if tryForEver && !s.routeStillValid(rURL) { + conn.Close() + return + } + // We have a route connection here. // Go ahead and create it and exit this func. s.createRoute(conn, rURL)