mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Merge pull request #2066 from nats-io/ln_cluster_loop_detection
[FIXED] Leafnode: incorrect loop detection in multi-cluster setup
This commit is contained in:
@@ -1256,10 +1256,13 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
|
||||
|
||||
// Now walk the results and add them to our smap
|
||||
c.mu.Lock()
|
||||
rc := c.leaf.remoteCluster
|
||||
c.leaf.smap = make(map[string]int32)
|
||||
for _, sub := range subs {
|
||||
// We ignore ourselves here.
|
||||
if c != sub.client {
|
||||
// Also don't add the subscription if it has a origin cluster and the
|
||||
// cluster name matches the one of the client we are sending to.
|
||||
if c != sub.client && (sub.origin == nil || (string(sub.origin) != rc)) {
|
||||
c.leaf.smap[keyFromSub(sub)]++
|
||||
if c.leaf.tsub == nil {
|
||||
c.leaf.tsub = make(map[*subscription]struct{})
|
||||
|
||||
@@ -3066,3 +3066,222 @@ func TestLeafNodeStreamImport(t *testing.T) {
|
||||
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
}
|
||||
|
||||
func TestLeafNodeRouteSubWithOrigin(t *testing.T) {
|
||||
lo1 := DefaultOptions()
|
||||
lo1.LeafNode.Host = "127.0.0.1"
|
||||
lo1.LeafNode.Port = -1
|
||||
lo1.Cluster.Name = "local"
|
||||
lo1.Cluster.Host = "127.0.0.1"
|
||||
lo1.Cluster.Port = -1
|
||||
l1 := RunServer(lo1)
|
||||
defer l1.Shutdown()
|
||||
|
||||
lo2 := DefaultOptions()
|
||||
lo2.LeafNode.Host = "127.0.0.1"
|
||||
lo2.LeafNode.Port = -1
|
||||
lo2.Cluster.Name = "local"
|
||||
lo2.Cluster.Host = "127.0.0.1"
|
||||
lo2.Cluster.Port = -1
|
||||
lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port))
|
||||
l2 := RunServer(lo2)
|
||||
defer l2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, l1, l2)
|
||||
|
||||
u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port))
|
||||
urls := []*url.URL{u1}
|
||||
|
||||
ro1 := DefaultOptions()
|
||||
ro1.Cluster.Name = "remote"
|
||||
ro1.Cluster.Host = "127.0.0.1"
|
||||
ro1.Cluster.Port = -1
|
||||
ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
|
||||
r1 := RunServer(ro1)
|
||||
defer r1.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, r1)
|
||||
|
||||
nc := natsConnect(t, r1.ClientURL(), nats.NoReconnect())
|
||||
defer nc.Close()
|
||||
natsSubSync(t, nc, "foo")
|
||||
natsQueueSubSync(t, nc, "bar", "baz")
|
||||
checkSubInterest(t, l2, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, l2, globalAccountName, "bar", time.Second)
|
||||
|
||||
// Now shutdown the leafnode and check that any subscription for $G on l2 are gone.
|
||||
r1.Shutdown()
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
acc := l2.GlobalAccount()
|
||||
if n := acc.TotalSubs(); n != 0 {
|
||||
return fmt.Errorf("Account %q should have 0 sub, got %v", acc.GetName(), n)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestLeafNodeLoopDetectionWithMultipleClusters(t *testing.T) {
|
||||
lo1 := DefaultOptions()
|
||||
lo1.LeafNode.Host = "127.0.0.1"
|
||||
lo1.LeafNode.Port = -1
|
||||
lo1.Cluster.Name = "local"
|
||||
lo1.Cluster.Host = "127.0.0.1"
|
||||
lo1.Cluster.Port = -1
|
||||
l1 := RunServer(lo1)
|
||||
defer l1.Shutdown()
|
||||
|
||||
lo2 := DefaultOptions()
|
||||
lo2.LeafNode.Host = "127.0.0.1"
|
||||
lo2.LeafNode.Port = -1
|
||||
lo2.Cluster.Name = "local"
|
||||
lo2.Cluster.Host = "127.0.0.1"
|
||||
lo2.Cluster.Port = -1
|
||||
lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port))
|
||||
l2 := RunServer(lo2)
|
||||
defer l2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, l1, l2)
|
||||
|
||||
u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port))
|
||||
u2, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo2.LeafNode.Port))
|
||||
urls := []*url.URL{u1, u2}
|
||||
|
||||
ro1 := DefaultOptions()
|
||||
ro1.Cluster.Name = "remote"
|
||||
ro1.Cluster.Host = "127.0.0.1"
|
||||
ro1.Cluster.Port = -1
|
||||
ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
|
||||
r1 := RunServer(ro1)
|
||||
defer r1.Shutdown()
|
||||
|
||||
l := &captureErrorLogger{errCh: make(chan string, 100)}
|
||||
r1.SetLogger(l, false, false)
|
||||
|
||||
ro2 := DefaultOptions()
|
||||
ro2.Cluster.Name = "remote"
|
||||
ro2.Cluster.Host = "127.0.0.1"
|
||||
ro2.Cluster.Port = -1
|
||||
ro2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ro1.Cluster.Port))
|
||||
ro2.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
ro2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
|
||||
r2 := RunServer(ro2)
|
||||
defer r2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, r1, r2)
|
||||
checkLeafNodeConnected(t, r1)
|
||||
checkLeafNodeConnected(t, r2)
|
||||
|
||||
l1.Shutdown()
|
||||
|
||||
// Now wait for r1 and r2 to reconnect, they should not have a problem of loop detection.
|
||||
checkLeafNodeConnected(t, r1)
|
||||
checkLeafNodeConnected(t, r2)
|
||||
|
||||
// Wait and make sure we don't have a loop error
|
||||
timeout := time.NewTimer(500 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case err := <-l.errCh:
|
||||
if strings.Contains(err, "Loop detected") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
case <-timeout.C:
|
||||
// OK, we are done.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeUnsubOnRouteDisconnect(t *testing.T) {
|
||||
lo1 := DefaultOptions()
|
||||
lo1.LeafNode.Host = "127.0.0.1"
|
||||
lo1.LeafNode.Port = -1
|
||||
lo1.Cluster.Name = "local"
|
||||
lo1.Cluster.Host = "127.0.0.1"
|
||||
lo1.Cluster.Port = -1
|
||||
l1 := RunServer(lo1)
|
||||
defer l1.Shutdown()
|
||||
|
||||
lo2 := DefaultOptions()
|
||||
lo2.LeafNode.Host = "127.0.0.1"
|
||||
lo2.LeafNode.Port = -1
|
||||
lo2.Cluster.Name = "local"
|
||||
lo2.Cluster.Host = "127.0.0.1"
|
||||
lo2.Cluster.Port = -1
|
||||
lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port))
|
||||
l2 := RunServer(lo2)
|
||||
defer l2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, l1, l2)
|
||||
|
||||
u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port))
|
||||
u2, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo2.LeafNode.Port))
|
||||
urls := []*url.URL{u1, u2}
|
||||
|
||||
ro1 := DefaultOptions()
|
||||
// DefaultOptions sets a cluster name, so make sure they are different.
|
||||
// Also, we don't have r1 and r2 clustered in this test, so set port to 0.
|
||||
ro1.Cluster.Name = _EMPTY_
|
||||
ro1.Cluster.Port = 0
|
||||
ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
|
||||
r1 := RunServer(ro1)
|
||||
defer r1.Shutdown()
|
||||
|
||||
ro2 := DefaultOptions()
|
||||
ro1.Cluster.Name = _EMPTY_
|
||||
ro2.Cluster.Port = 0
|
||||
ro2.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
// Have this one point only to l2
|
||||
ro2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u2}}}
|
||||
r2 := RunServer(ro2)
|
||||
defer r2.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, r1)
|
||||
checkLeafNodeConnected(t, r2)
|
||||
|
||||
// Create a subscription on r1.
|
||||
nc := natsConnect(t, r1.ClientURL())
|
||||
defer nc.Close()
|
||||
sub := natsSubSync(t, nc, "foo")
|
||||
natsFlush(t, nc)
|
||||
|
||||
checkSubInterest(t, l2, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, r2, globalAccountName, "foo", time.Second)
|
||||
|
||||
nc2 := natsConnect(t, r2.ClientURL())
|
||||
defer nc2.Close()
|
||||
natsPub(t, nc, "foo", []byte("msg1"))
|
||||
|
||||
// Check message received
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
|
||||
// Now shutdown l1, l2 should update subscription interest to r2.
|
||||
// When r1 reconnects to l2, subscription should be updated too.
|
||||
l1.Shutdown()
|
||||
|
||||
// Wait a bit (so that the check of interest is not OK just because
|
||||
// the route would not have been yet detected as broken), and check
|
||||
// interest still present on r2, l2.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
checkSubInterest(t, l2, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, r2, globalAccountName, "foo", time.Second)
|
||||
|
||||
// Check again that message received ok
|
||||
natsPub(t, nc, "foo", []byte("msg2"))
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
|
||||
// Now close client. Interest should disappear on r2. Due to a bug,
|
||||
// it was not.
|
||||
nc.Close()
|
||||
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
acc := r2.GlobalAccount()
|
||||
if n := acc.Interest("foo"); n != 0 {
|
||||
return fmt.Errorf("Still interest on subject: %v", n)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -891,7 +891,8 @@ func (c *client) removeRemoteSubs() {
|
||||
ase := as[accountName]
|
||||
if ase == nil {
|
||||
if v, ok := srv.accounts.Load(accountName); ok {
|
||||
as[accountName] = &asubs{acc: v.(*Account), subs: []*subscription{sub}}
|
||||
ase = &asubs{acc: v.(*Account), subs: []*subscription{sub}}
|
||||
as[accountName] = ase
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
@@ -901,6 +902,7 @@ func (c *client) removeRemoteSubs() {
|
||||
if srv.gateway.enabled {
|
||||
srv.gatewayUpdateSubInterest(accountName, sub, -1)
|
||||
}
|
||||
srv.updateLeafNodes(ase.acc, sub, -1)
|
||||
}
|
||||
|
||||
// Now remove the subs by batch for each account sublist.
|
||||
@@ -1077,9 +1079,9 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
|
||||
// We store local subs by account and subject and optionally queue name.
|
||||
// If we have a queue it will have a trailing weight which we do not want.
|
||||
if sub.queue != nil {
|
||||
sub.sid = arg[:len(arg)-len(args[3+off])-1]
|
||||
sub.sid = arg[len(sub.origin)+off : len(arg)-len(args[3+off])-1]
|
||||
} else {
|
||||
sub.sid = arg
|
||||
sub.sid = arg[len(sub.origin)+off:]
|
||||
}
|
||||
key := string(sub.sid)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user