Merge pull request #631 from nats-io/fix_connect_urls_notification_with_explicit_routes

[FIXED] Cluster toplogy change possibly not sent to clients
This commit is contained in:
Ivan Kozlovic
2018-03-04 14:54:54 -07:00
committed by GitHub
3 changed files with 62 additions and 3 deletions

View File

@@ -564,6 +564,9 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
remote.mu.Lock()
// r will be not nil if c.route.didSolicit was true
if r != nil {
// If we upgrade to solicited, we still want to keep the remote's
// connectURLs. So transfer those.
r.connectURLs = remote.route.connectURLs
remote.route = r
}
// This is to mitigate the issue where both sides add the route

View File

@@ -728,3 +728,58 @@ func TestRoutesToEachOther(t *testing.T) {
t.Log("Was not able to get duplicate route this time!")
}
}
func TestConnectULRsWithRoutesToEachOther(t *testing.T) {
optsA := DefaultOptions()
optsA.Host = "127.0.0.1"
optsA.Cluster.Port = 7246
optsA.Routes = RoutesFromStr("nats://127.0.0.1:7247")
optsB := DefaultOptions()
optsB.Host = "127.0.0.1"
optsB.Cluster.Port = 7247
optsB.Routes = RoutesFromStr("nats://127.0.0.1:7246")
// Start servers with go routines to increase change of
// each server connecting to each other at the same time.
srvA := New(optsA)
defer srvA.Shutdown()
srvB := New(optsB)
defer srvB.Shutdown()
go srvA.Start()
go srvB.Start()
// Wait for cluster to be formed
checkClusterFormed(t, srvA, srvB)
// Connect to serverB
url := fmt.Sprintf("nats://%s", srvB.Addr().String())
nc, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
ds := nc.Servers()
if len(ds) != 2 {
t.Fatalf("Expected 2 servers, got %v", ds)
}
// Shutdown server A and make sure that we are notfied
// that server A is no longer running.
srvA.Shutdown()
timeout := time.Now().Add(5 * time.Second)
ok := false
for time.Now().Before(timeout) {
ds = nc.Servers()
if len(ds) == 1 {
ok = true
break
}
time.Sleep(50 * time.Millisecond)
}
if !ok {
t.Fatalf("List of servers should be only 1, got %v", ds)
}
}

View File

@@ -255,10 +255,11 @@ func (s *Server) logPid() error {
func (s *Server) Start() {
s.Noticef("Starting nats-server version %s", VERSION)
s.Debugf("Go build version %s", s.info.GoVersion)
if gitCommit == "" {
gitCommit = "not set"
gc := gitCommit
if gc == "" {
gc = "not set"
}
s.Noticef("Git commit [%s]", gitCommit)
s.Noticef("Git commit [%s]", gc)
// Avoid RACE between Start() and Shutdown()
s.mu.Lock()