mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Prevent multiple solicited leafnodes from forming cycles.
When a solicited leafnode comes from multiple servers that themselves are a cluster, cycles were formed. This change allows solicited leafnodes to behave similar to gateways in that each server of a cluster is expected to have a solicted leafnode per destination account and cluster. We no longer forward subscription interest or messages to a cluster from a server that has a solicited leafnode. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -40,7 +40,6 @@ func createLeafConn(t tLogger, host string, port int) net.Conn {
|
||||
|
||||
func testDefaultOptionsForLeafNodes() *server.Options {
|
||||
o := DefaultTestOptions
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
o.LeafNode.Host = o.Host
|
||||
o.LeafNode.Port = -1
|
||||
@@ -64,7 +63,6 @@ func runSolicitLeafServer(lso *server.Options) (*server.Server, *server.Options)
|
||||
|
||||
func runSolicitLeafServerToURL(surl string) (*server.Server, *server.Options) {
|
||||
o := DefaultTestOptions
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
rurl, _ := url.Parse(surl)
|
||||
o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}}
|
||||
@@ -419,9 +417,11 @@ func TestLeafNodeAndRoutes(t *testing.T) {
|
||||
expect(pongRe)
|
||||
expectNothing(t, lc)
|
||||
|
||||
send("UNSUB 2\r\n")
|
||||
send("UNSUB 2\r\nPING\r\n")
|
||||
expect(pongRe)
|
||||
expectNothing(t, lc)
|
||||
send("UNSUB 1\r\n")
|
||||
send("UNSUB 1\r\nPING\r\n")
|
||||
expect(pongRe)
|
||||
leafExpect(lunsubRe)
|
||||
|
||||
// Now put it back and test msg flow.
|
||||
@@ -522,7 +522,6 @@ type cluster struct {
|
||||
|
||||
func testDefaultClusterOptionsForLeafNodes() *server.Options {
|
||||
o := DefaultTestOptions
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
o.Cluster.Host = o.Host
|
||||
o.Cluster.Port = -1
|
||||
@@ -928,7 +927,6 @@ func TestLeafNodeBasicAuth(t *testing.T) {
|
||||
|
||||
func runTLSSolicitLeafServer(lso *server.Options) (*server.Server, *server.Options) {
|
||||
o := DefaultTestOptions
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lso.LeafNode.Host, lso.LeafNode.Port))
|
||||
remote := &server.RemoteLeafOpts{URLs: []*url.URL{rurl}}
|
||||
@@ -2388,7 +2386,6 @@ func TestLeafNodeAndGatewayGlobalRouting(t *testing.T) {
|
||||
defer ncl.Close()
|
||||
|
||||
ncl.Subscribe("foo", func(m *nats.Msg) {
|
||||
fmt.Printf("Reply is %v\n", m.Reply)
|
||||
m.Respond([]byte("World"))
|
||||
})
|
||||
|
||||
@@ -2482,3 +2479,156 @@ func TestLeafNodeMultipleRemoteURLs(t *testing.T) {
|
||||
|
||||
checkLeafNodeConnected(t, s)
|
||||
}
|
||||
|
||||
func runSolicitLeafCluster(t *testing.T, clusterName string, d1, d2 *cluster) *cluster {
|
||||
c := &cluster{servers: make([]*server.Server, 0, 2), opts: make([]*server.Options, 0, 2), name: clusterName}
|
||||
|
||||
// Who we will solicit for server 1
|
||||
ci := rand.Intn(len(d1.opts))
|
||||
opts := d1.opts[ci]
|
||||
surl := fmt.Sprintf("nats-leaf://%s:%d", opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
|
||||
o := DefaultTestOptions
|
||||
o.Port = -1
|
||||
rurl, _ := url.Parse(surl)
|
||||
o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}}
|
||||
o.LeafNode.ReconnectInterval = 100 * time.Millisecond
|
||||
o.Cluster.Host = o.Host
|
||||
o.Cluster.Port = -1
|
||||
s := RunServer(&o)
|
||||
checkLeafNodeConnected(t, d1.servers[ci])
|
||||
|
||||
c.servers = append(c.servers, s)
|
||||
c.opts = append(c.opts, &o)
|
||||
|
||||
// Grab route info
|
||||
routeAddr := fmt.Sprintf("nats-route://%s:%d", o.Cluster.Host, o.Cluster.Port)
|
||||
curl, _ := url.Parse(routeAddr)
|
||||
|
||||
// Who we will solicit for server 2
|
||||
ci = rand.Intn(len(d2.opts))
|
||||
opts = d2.opts[ci]
|
||||
surl = fmt.Sprintf("nats-leaf://%s:%d", opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
|
||||
// This is for the case were d1 == d2 and we select the same server.
|
||||
plfn := d2.servers[ci].NumLeafNodes()
|
||||
|
||||
o2 := DefaultTestOptions
|
||||
o2.Port = -1
|
||||
rurl, _ = url.Parse(surl)
|
||||
o2.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}}
|
||||
o2.LeafNode.ReconnectInterval = 100 * time.Millisecond
|
||||
o2.Cluster.Host = o.Host
|
||||
o2.Cluster.Port = -1
|
||||
o2.Routes = []*url.URL{curl}
|
||||
s = RunServer(&o2)
|
||||
|
||||
if plfn == 0 {
|
||||
checkLeafNodeConnected(t, d2.servers[ci])
|
||||
} else {
|
||||
checkLeafNode2Connected(t, d2.servers[ci])
|
||||
}
|
||||
|
||||
c.servers = append(c.servers, s)
|
||||
c.opts = append(c.opts, &o2)
|
||||
|
||||
checkClusterFormed(t, c.servers...)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func clientForCluster(t *testing.T, c *cluster) *nats.Conn {
|
||||
t.Helper()
|
||||
opts := c.opts[rand.Intn(len(c.opts))]
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
return nc
|
||||
}
|
||||
|
||||
func TestLeafNodeCycleWithSolicited(t *testing.T) {
|
||||
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
// Accepting leafnode cluster, e.g. NGS
|
||||
ca := createClusterWithName(t, "A", 3)
|
||||
defer shutdownCluster(ca)
|
||||
cb := createClusterWithName(t, "B", 3, ca)
|
||||
defer shutdownCluster(cb)
|
||||
|
||||
// Create the responders.
|
||||
requestsReceived := int32(0)
|
||||
|
||||
nc := clientForCluster(t, ca)
|
||||
defer nc.Close()
|
||||
nc.QueueSubscribe("request", "cycles", func(m *nats.Msg) {
|
||||
atomic.AddInt32(&requestsReceived, 1)
|
||||
m.Respond([]byte("22"))
|
||||
})
|
||||
nc.Flush()
|
||||
|
||||
nc = clientForCluster(t, cb)
|
||||
defer nc.Close()
|
||||
nc.QueueSubscribe("request", "cycles", func(m *nats.Msg) {
|
||||
atomic.AddInt32(&requestsReceived, 1)
|
||||
m.Respond([]byte("33"))
|
||||
})
|
||||
nc.Flush()
|
||||
|
||||
// Soliciting cluster, both solicited connected to the "A" cluster
|
||||
sc := runSolicitLeafCluster(t, "SC", ca, ca)
|
||||
defer shutdownCluster(sc)
|
||||
|
||||
// Connect a client to a random server in sc
|
||||
createClientAndRequest := func(c *cluster) (*nats.Conn, *nats.Subscription) {
|
||||
nc := clientForCluster(t, c)
|
||||
reply := nats.NewInbox()
|
||||
sub, err := nc.SubscribeSync(reply)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not subscribe: %v", err)
|
||||
}
|
||||
if err := nc.PublishRequest("request", reply, []byte("fingers crossed")); err != nil {
|
||||
t.Fatalf("Error sending request: %v", err)
|
||||
}
|
||||
return nc, sub
|
||||
}
|
||||
|
||||
verifyOneResponse := func(sub *nats.Subscription) {
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
m, _, err := sub.Pending()
|
||||
if err != nil {
|
||||
t.Fatalf("Error calling Pending(): %v", err)
|
||||
}
|
||||
if m > 1 {
|
||||
t.Fatalf("Received more then one response, cycle indicated: %d", m)
|
||||
}
|
||||
}
|
||||
|
||||
verifyRequestTotal := func(nre int32) {
|
||||
if nr := atomic.LoadInt32(&requestsReceived); nr != nre {
|
||||
t.Fatalf("Expected %d requests received, got %d", nre, nr)
|
||||
}
|
||||
}
|
||||
|
||||
// This should pass to here, but if we have a cycle things will be spinning and we will receive
|
||||
// too many responses when it should only be 1.
|
||||
nc, rsub := createClientAndRequest(sc)
|
||||
defer nc.Close()
|
||||
verifyOneResponse(rsub)
|
||||
verifyRequestTotal(1)
|
||||
|
||||
// Do a solicit across GW, so shut this one down.
|
||||
nc.Close()
|
||||
shutdownCluster(sc)
|
||||
|
||||
// Soliciting cluster, connect to different clusters across a GW.
|
||||
sc = runSolicitLeafCluster(t, "SC", ca, cb)
|
||||
defer shutdownCluster(sc)
|
||||
|
||||
nc, rsub = createClientAndRequest(sc)
|
||||
defer nc.Close()
|
||||
verifyOneResponse(rsub)
|
||||
verifyRequestTotal(2) // This is total since use same responders.
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user