mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] Leafnode: incorrect loop detection in multi-cluster setup
If leafnodes from a cluster were to reconnect to a server in a different cluster, it was possible for that server to send to the leafnodes some their own subscriptions that could cause an inproper loop detection error. There was also a defect that would cause subscriptions over route for leafnode subscriptions to be registered under the wrong key, which would lead to those subscriptions not being properly removed on route disconnect. Finally, during route disconnect, the leafnodes map was not updated. This PR fixes that too. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -3066,3 +3066,222 @@ func TestLeafNodeStreamImport(t *testing.T) {
|
||||
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
}
|
||||
|
||||
func TestLeafNodeRouteSubWithOrigin(t *testing.T) {
|
||||
lo1 := DefaultOptions()
|
||||
lo1.LeafNode.Host = "127.0.0.1"
|
||||
lo1.LeafNode.Port = -1
|
||||
lo1.Cluster.Name = "local"
|
||||
lo1.Cluster.Host = "127.0.0.1"
|
||||
lo1.Cluster.Port = -1
|
||||
l1 := RunServer(lo1)
|
||||
defer l1.Shutdown()
|
||||
|
||||
lo2 := DefaultOptions()
|
||||
lo2.LeafNode.Host = "127.0.0.1"
|
||||
lo2.LeafNode.Port = -1
|
||||
lo2.Cluster.Name = "local"
|
||||
lo2.Cluster.Host = "127.0.0.1"
|
||||
lo2.Cluster.Port = -1
|
||||
lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port))
|
||||
l2 := RunServer(lo2)
|
||||
defer l2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, l1, l2)
|
||||
|
||||
u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port))
|
||||
urls := []*url.URL{u1}
|
||||
|
||||
ro1 := DefaultOptions()
|
||||
ro1.Cluster.Name = "remote"
|
||||
ro1.Cluster.Host = "127.0.0.1"
|
||||
ro1.Cluster.Port = -1
|
||||
ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
|
||||
r1 := RunServer(ro1)
|
||||
defer r1.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, r1)
|
||||
|
||||
nc := natsConnect(t, r1.ClientURL(), nats.NoReconnect())
|
||||
defer nc.Close()
|
||||
natsSubSync(t, nc, "foo")
|
||||
natsQueueSubSync(t, nc, "bar", "baz")
|
||||
checkSubInterest(t, l2, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, l2, globalAccountName, "bar", time.Second)
|
||||
|
||||
// Now shutdown the leafnode and check that any subscription for $G on l2 are gone.
|
||||
r1.Shutdown()
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
acc := l2.GlobalAccount()
|
||||
if n := acc.TotalSubs(); n != 0 {
|
||||
return fmt.Errorf("Account %q should have 0 sub, got %v", acc.GetName(), n)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestLeafNodeLoopDetectionWithMultipleClusters(t *testing.T) {
|
||||
lo1 := DefaultOptions()
|
||||
lo1.LeafNode.Host = "127.0.0.1"
|
||||
lo1.LeafNode.Port = -1
|
||||
lo1.Cluster.Name = "local"
|
||||
lo1.Cluster.Host = "127.0.0.1"
|
||||
lo1.Cluster.Port = -1
|
||||
l1 := RunServer(lo1)
|
||||
defer l1.Shutdown()
|
||||
|
||||
lo2 := DefaultOptions()
|
||||
lo2.LeafNode.Host = "127.0.0.1"
|
||||
lo2.LeafNode.Port = -1
|
||||
lo2.Cluster.Name = "local"
|
||||
lo2.Cluster.Host = "127.0.0.1"
|
||||
lo2.Cluster.Port = -1
|
||||
lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port))
|
||||
l2 := RunServer(lo2)
|
||||
defer l2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, l1, l2)
|
||||
|
||||
u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port))
|
||||
u2, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo2.LeafNode.Port))
|
||||
urls := []*url.URL{u1, u2}
|
||||
|
||||
ro1 := DefaultOptions()
|
||||
ro1.Cluster.Name = "remote"
|
||||
ro1.Cluster.Host = "127.0.0.1"
|
||||
ro1.Cluster.Port = -1
|
||||
ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
|
||||
r1 := RunServer(ro1)
|
||||
defer r1.Shutdown()
|
||||
|
||||
l := &captureErrorLogger{errCh: make(chan string, 100)}
|
||||
r1.SetLogger(l, false, false)
|
||||
|
||||
ro2 := DefaultOptions()
|
||||
ro2.Cluster.Name = "remote"
|
||||
ro2.Cluster.Host = "127.0.0.1"
|
||||
ro2.Cluster.Port = -1
|
||||
ro2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ro1.Cluster.Port))
|
||||
ro2.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
ro2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
|
||||
r2 := RunServer(ro2)
|
||||
defer r2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, r1, r2)
|
||||
checkLeafNodeConnected(t, r1)
|
||||
checkLeafNodeConnected(t, r2)
|
||||
|
||||
l1.Shutdown()
|
||||
|
||||
// Now wait for r1 and r2 to reconnect, they should not have a problem of loop detection.
|
||||
checkLeafNodeConnected(t, r1)
|
||||
checkLeafNodeConnected(t, r2)
|
||||
|
||||
// Wait and make sure we don't have a loop error
|
||||
timeout := time.NewTimer(500 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case err := <-l.errCh:
|
||||
if strings.Contains(err, "Loop detected") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
case <-timeout.C:
|
||||
// OK, we are done.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeUnsubOnRouteDisconnect(t *testing.T) {
|
||||
lo1 := DefaultOptions()
|
||||
lo1.LeafNode.Host = "127.0.0.1"
|
||||
lo1.LeafNode.Port = -1
|
||||
lo1.Cluster.Name = "local"
|
||||
lo1.Cluster.Host = "127.0.0.1"
|
||||
lo1.Cluster.Port = -1
|
||||
l1 := RunServer(lo1)
|
||||
defer l1.Shutdown()
|
||||
|
||||
lo2 := DefaultOptions()
|
||||
lo2.LeafNode.Host = "127.0.0.1"
|
||||
lo2.LeafNode.Port = -1
|
||||
lo2.Cluster.Name = "local"
|
||||
lo2.Cluster.Host = "127.0.0.1"
|
||||
lo2.Cluster.Port = -1
|
||||
lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port))
|
||||
l2 := RunServer(lo2)
|
||||
defer l2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, l1, l2)
|
||||
|
||||
u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port))
|
||||
u2, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo2.LeafNode.Port))
|
||||
urls := []*url.URL{u1, u2}
|
||||
|
||||
ro1 := DefaultOptions()
|
||||
// DefaultOptions sets a cluster name, so make sure they are different.
|
||||
// Also, we don't have r1 and r2 clustered in this test, so set port to 0.
|
||||
ro1.Cluster.Name = _EMPTY_
|
||||
ro1.Cluster.Port = 0
|
||||
ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
|
||||
r1 := RunServer(ro1)
|
||||
defer r1.Shutdown()
|
||||
|
||||
ro2 := DefaultOptions()
|
||||
ro1.Cluster.Name = _EMPTY_
|
||||
ro2.Cluster.Port = 0
|
||||
ro2.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
// Have this one point only to l2
|
||||
ro2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u2}}}
|
||||
r2 := RunServer(ro2)
|
||||
defer r2.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, r1)
|
||||
checkLeafNodeConnected(t, r2)
|
||||
|
||||
// Create a subscription on r1.
|
||||
nc := natsConnect(t, r1.ClientURL())
|
||||
defer nc.Close()
|
||||
sub := natsSubSync(t, nc, "foo")
|
||||
natsFlush(t, nc)
|
||||
|
||||
checkSubInterest(t, l2, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, r2, globalAccountName, "foo", time.Second)
|
||||
|
||||
nc2 := natsConnect(t, r2.ClientURL())
|
||||
defer nc2.Close()
|
||||
natsPub(t, nc, "foo", []byte("msg1"))
|
||||
|
||||
// Check message received
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
|
||||
// Now shutdown l1, l2 should update subscription interest to r2.
|
||||
// When r1 reconnects to l2, subscription should be updated too.
|
||||
l1.Shutdown()
|
||||
|
||||
// Wait a bit (so that the check of interest is not OK just because
|
||||
// the route would not have been yet detected as broken), and check
|
||||
// interest still present on r2, l2.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
checkSubInterest(t, l2, globalAccountName, "foo", time.Second)
|
||||
checkSubInterest(t, r2, globalAccountName, "foo", time.Second)
|
||||
|
||||
// Check again that message received ok
|
||||
natsPub(t, nc, "foo", []byte("msg2"))
|
||||
natsNexMsg(t, sub, time.Second)
|
||||
|
||||
// Now close client. Interest should disappear on r2. Due to a bug,
|
||||
// it was not.
|
||||
nc.Close()
|
||||
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
acc := r2.GlobalAccount()
|
||||
if n := acc.Interest("foo"); n != 0 {
|
||||
return fmt.Errorf("Still interest on subject: %v", n)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user