mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 02:07:59 -07:00
Merge pull request #1070 from nats-io/leafcycle
Prevent multiple solicited leafnodes from forming cycles.
This commit is contained in:
@@ -2426,8 +2426,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
msgh = append(msgh, ' ')
|
||||
si := len(msgh)
|
||||
|
||||
// For sending messages across routes. Reset it if we have one.
|
||||
// We reuse this data structure.
|
||||
// For sending messages across routes and leafnodes.
|
||||
// Reset if we have one since we reuse this data structure.
|
||||
if c.in.rts != nil {
|
||||
c.in.rts = c.in.rts[:0]
|
||||
}
|
||||
@@ -2438,10 +2438,9 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
// these after everything else.
|
||||
switch sub.client.kind {
|
||||
case ROUTER:
|
||||
if c.kind == ROUTER {
|
||||
continue
|
||||
if c.kind != ROUTER && !c.isSolicitedLeafNode() {
|
||||
c.addSubToRouteTargets(sub)
|
||||
}
|
||||
c.addSubToRouteTargets(sub)
|
||||
continue
|
||||
case GATEWAY:
|
||||
// Never send to gateway from here.
|
||||
@@ -2451,7 +2450,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
// Leaf node delivery audience is different however.
|
||||
// Also leaf nodes are always no echo, so we make sure we are not
|
||||
// going to send back to ourselves here.
|
||||
if c != sub.client {
|
||||
if c != sub.client && (c.kind != ROUTER || !c.isSolicitedLeafNode()) {
|
||||
c.addSubToRouteTargets(sub)
|
||||
}
|
||||
continue
|
||||
@@ -2576,7 +2575,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
|
||||
sendToRoutesOrLeafs:
|
||||
|
||||
// If no messages for routes return here.
|
||||
// If no messages for routes or leafnodes return here.
|
||||
if len(c.in.rts) == 0 {
|
||||
return queues
|
||||
}
|
||||
|
||||
@@ -52,6 +52,7 @@ type leafNodeCfg struct {
|
||||
curURL *url.URL
|
||||
}
|
||||
|
||||
// Check to see if this is a solicited leafnode. We do special processing for solicited.
|
||||
func (c *client) isSolicitedLeafNode() bool {
|
||||
return c.kind == LEAF && c.leaf.remote != nil
|
||||
}
|
||||
@@ -627,7 +628,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
c.registerWithAccount(c.acc)
|
||||
s.addLeafNodeConnection(c)
|
||||
s.initLeafNodeSmap(c)
|
||||
c.sendAllAccountSubs()
|
||||
c.sendAllLeafSubs()
|
||||
}
|
||||
|
||||
return c
|
||||
@@ -797,7 +798,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
|
||||
// We are good to go, send over all the bound account subscriptions.
|
||||
s.startGoRoutine(func() {
|
||||
c.sendAllAccountSubs()
|
||||
c.sendAllLeafSubs()
|
||||
s.grWG.Done()
|
||||
})
|
||||
|
||||
@@ -825,7 +826,12 @@ func (s *Server) initLeafNodeSmap(c *client) {
|
||||
ims := []string{}
|
||||
acc.mu.RLock()
|
||||
accName := acc.Name
|
||||
acc.sl.All(&subs)
|
||||
// If we are solicited we only send interest for local clients.
|
||||
if c.isSolicitedLeafNode() {
|
||||
acc.sl.localSubs(&subs)
|
||||
} else {
|
||||
acc.sl.All(&subs)
|
||||
}
|
||||
// Since leaf nodes only send on interest, if the bound
|
||||
// account has import services we need to send those over.
|
||||
for isubj := range acc.imports.services {
|
||||
@@ -894,7 +900,7 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
|
||||
_l := [32]*client{}
|
||||
leafs := _l[:0]
|
||||
|
||||
// Grab all leaf nodes. Ignore leafnode if sub's client is a leafnode and matches.
|
||||
// Grab all leaf nodes. Ignore a leafnode if sub's client is a leafnode and matches.
|
||||
acc.mu.RLock()
|
||||
for _, ln := range acc.lleafs {
|
||||
if ln != sub.client {
|
||||
@@ -909,11 +915,18 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
|
||||
}
|
||||
|
||||
// This will make an update to our internal smap and determine if we should send out
|
||||
// and interest update to the remote side.
|
||||
// an interest update to the remote side.
|
||||
func (c *client) updateSmap(sub *subscription, delta int32) {
|
||||
key := keyFromSub(sub)
|
||||
|
||||
c.mu.Lock()
|
||||
|
||||
// If we are solicited make sure this is a local client.
|
||||
if c.isSolicitedLeafNode() && sub.client.kind != CLIENT {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
n := c.leaf.smap[key]
|
||||
// We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
|
||||
update := sub.queue != nil || n == 0 || n+delta <= 0
|
||||
@@ -956,8 +969,8 @@ func keyFromSub(sub *subscription) string {
|
||||
}
|
||||
|
||||
// Send all subscriptions for this account that include local
|
||||
// and all subscriptions besides our own.
|
||||
func (c *client) sendAllAccountSubs() {
|
||||
// and possibly all other remote subscriptions.
|
||||
func (c *client) sendAllLeafSubs() {
|
||||
// Hold all at once for now.
|
||||
var b bytes.Buffer
|
||||
|
||||
@@ -1084,23 +1097,25 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
atomic.StoreInt32(&osub.qw, sub.qw)
|
||||
acc.sl.UpdateRemoteQSub(osub)
|
||||
}
|
||||
|
||||
solicited := c.isSolicitedLeafNode()
|
||||
c.mu.Unlock()
|
||||
|
||||
// Treat leaf node subscriptions similar to a client subscription, meaning we
|
||||
// send them to both routes and gateways and other leaf nodes. We also do
|
||||
// the shadow subscriptions.
|
||||
if err := c.addShadowSubscriptions(acc, sub); err != nil {
|
||||
c.Errorf(err.Error())
|
||||
}
|
||||
// If we are routing add to the route map for the associated account.
|
||||
srv.updateRouteSubscriptionMap(acc, sub, 1)
|
||||
if updateGWs {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
|
||||
}
|
||||
// Now check on leafnode updates for other leaf nodes.
|
||||
srv.updateLeafNodes(acc, sub, 1)
|
||||
|
||||
// If we are not solicited, treat leaf node subscriptions similar to a
|
||||
// client subscription, meaning we forward them to routes, gateways and
|
||||
// other leaf nodes as needed.
|
||||
if !solicited {
|
||||
// If we are routing add to the route map for the associated account.
|
||||
srv.updateRouteSubscriptionMap(acc, sub, 1)
|
||||
if updateGWs {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
|
||||
}
|
||||
// Now check on leafnode updates for other leaf nodes.
|
||||
srv.updateLeafNodes(acc, sub, 1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -40,7 +40,6 @@ func createLeafConn(t tLogger, host string, port int) net.Conn {
|
||||
|
||||
func testDefaultOptionsForLeafNodes() *server.Options {
|
||||
o := DefaultTestOptions
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
o.LeafNode.Host = o.Host
|
||||
o.LeafNode.Port = -1
|
||||
@@ -64,7 +63,6 @@ func runSolicitLeafServer(lso *server.Options) (*server.Server, *server.Options)
|
||||
|
||||
func runSolicitLeafServerToURL(surl string) (*server.Server, *server.Options) {
|
||||
o := DefaultTestOptions
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
rurl, _ := url.Parse(surl)
|
||||
o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}}
|
||||
@@ -419,9 +417,11 @@ func TestLeafNodeAndRoutes(t *testing.T) {
|
||||
expect(pongRe)
|
||||
expectNothing(t, lc)
|
||||
|
||||
send("UNSUB 2\r\n")
|
||||
send("UNSUB 2\r\nPING\r\n")
|
||||
expect(pongRe)
|
||||
expectNothing(t, lc)
|
||||
send("UNSUB 1\r\n")
|
||||
send("UNSUB 1\r\nPING\r\n")
|
||||
expect(pongRe)
|
||||
leafExpect(lunsubRe)
|
||||
|
||||
// Now put it back and test msg flow.
|
||||
@@ -522,7 +522,6 @@ type cluster struct {
|
||||
|
||||
func testDefaultClusterOptionsForLeafNodes() *server.Options {
|
||||
o := DefaultTestOptions
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
o.Cluster.Host = o.Host
|
||||
o.Cluster.Port = -1
|
||||
@@ -928,7 +927,6 @@ func TestLeafNodeBasicAuth(t *testing.T) {
|
||||
|
||||
func runTLSSolicitLeafServer(lso *server.Options) (*server.Server, *server.Options) {
|
||||
o := DefaultTestOptions
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lso.LeafNode.Host, lso.LeafNode.Port))
|
||||
remote := &server.RemoteLeafOpts{URLs: []*url.URL{rurl}}
|
||||
@@ -2388,7 +2386,6 @@ func TestLeafNodeAndGatewayGlobalRouting(t *testing.T) {
|
||||
defer ncl.Close()
|
||||
|
||||
ncl.Subscribe("foo", func(m *nats.Msg) {
|
||||
fmt.Printf("Reply is %v\n", m.Reply)
|
||||
m.Respond([]byte("World"))
|
||||
})
|
||||
|
||||
@@ -2482,3 +2479,156 @@ func TestLeafNodeMultipleRemoteURLs(t *testing.T) {
|
||||
|
||||
checkLeafNodeConnected(t, s)
|
||||
}
|
||||
|
||||
func runSolicitLeafCluster(t *testing.T, clusterName string, d1, d2 *cluster) *cluster {
|
||||
c := &cluster{servers: make([]*server.Server, 0, 2), opts: make([]*server.Options, 0, 2), name: clusterName}
|
||||
|
||||
// Who we will solicit for server 1
|
||||
ci := rand.Intn(len(d1.opts))
|
||||
opts := d1.opts[ci]
|
||||
surl := fmt.Sprintf("nats-leaf://%s:%d", opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
|
||||
o := DefaultTestOptions
|
||||
o.Port = -1
|
||||
rurl, _ := url.Parse(surl)
|
||||
o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}}
|
||||
o.LeafNode.ReconnectInterval = 100 * time.Millisecond
|
||||
o.Cluster.Host = o.Host
|
||||
o.Cluster.Port = -1
|
||||
s := RunServer(&o)
|
||||
checkLeafNodeConnected(t, d1.servers[ci])
|
||||
|
||||
c.servers = append(c.servers, s)
|
||||
c.opts = append(c.opts, &o)
|
||||
|
||||
// Grab route info
|
||||
routeAddr := fmt.Sprintf("nats-route://%s:%d", o.Cluster.Host, o.Cluster.Port)
|
||||
curl, _ := url.Parse(routeAddr)
|
||||
|
||||
// Who we will solicit for server 2
|
||||
ci = rand.Intn(len(d2.opts))
|
||||
opts = d2.opts[ci]
|
||||
surl = fmt.Sprintf("nats-leaf://%s:%d", opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
|
||||
// This is for the case were d1 == d2 and we select the same server.
|
||||
plfn := d2.servers[ci].NumLeafNodes()
|
||||
|
||||
o2 := DefaultTestOptions
|
||||
o2.Port = -1
|
||||
rurl, _ = url.Parse(surl)
|
||||
o2.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}}
|
||||
o2.LeafNode.ReconnectInterval = 100 * time.Millisecond
|
||||
o2.Cluster.Host = o.Host
|
||||
o2.Cluster.Port = -1
|
||||
o2.Routes = []*url.URL{curl}
|
||||
s = RunServer(&o2)
|
||||
|
||||
if plfn == 0 {
|
||||
checkLeafNodeConnected(t, d2.servers[ci])
|
||||
} else {
|
||||
checkLeafNode2Connected(t, d2.servers[ci])
|
||||
}
|
||||
|
||||
c.servers = append(c.servers, s)
|
||||
c.opts = append(c.opts, &o2)
|
||||
|
||||
checkClusterFormed(t, c.servers...)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func clientForCluster(t *testing.T, c *cluster) *nats.Conn {
|
||||
t.Helper()
|
||||
opts := c.opts[rand.Intn(len(c.opts))]
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
return nc
|
||||
}
|
||||
|
||||
func TestLeafNodeCycleWithSolicited(t *testing.T) {
|
||||
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
// Accepting leafnode cluster, e.g. NGS
|
||||
ca := createClusterWithName(t, "A", 3)
|
||||
defer shutdownCluster(ca)
|
||||
cb := createClusterWithName(t, "B", 3, ca)
|
||||
defer shutdownCluster(cb)
|
||||
|
||||
// Create the responders.
|
||||
requestsReceived := int32(0)
|
||||
|
||||
nc := clientForCluster(t, ca)
|
||||
defer nc.Close()
|
||||
nc.QueueSubscribe("request", "cycles", func(m *nats.Msg) {
|
||||
atomic.AddInt32(&requestsReceived, 1)
|
||||
m.Respond([]byte("22"))
|
||||
})
|
||||
nc.Flush()
|
||||
|
||||
nc = clientForCluster(t, cb)
|
||||
defer nc.Close()
|
||||
nc.QueueSubscribe("request", "cycles", func(m *nats.Msg) {
|
||||
atomic.AddInt32(&requestsReceived, 1)
|
||||
m.Respond([]byte("33"))
|
||||
})
|
||||
nc.Flush()
|
||||
|
||||
// Soliciting cluster, both solicited connected to the "A" cluster
|
||||
sc := runSolicitLeafCluster(t, "SC", ca, ca)
|
||||
defer shutdownCluster(sc)
|
||||
|
||||
// Connect a client to a random server in sc
|
||||
createClientAndRequest := func(c *cluster) (*nats.Conn, *nats.Subscription) {
|
||||
nc := clientForCluster(t, c)
|
||||
reply := nats.NewInbox()
|
||||
sub, err := nc.SubscribeSync(reply)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not subscribe: %v", err)
|
||||
}
|
||||
if err := nc.PublishRequest("request", reply, []byte("fingers crossed")); err != nil {
|
||||
t.Fatalf("Error sending request: %v", err)
|
||||
}
|
||||
return nc, sub
|
||||
}
|
||||
|
||||
verifyOneResponse := func(sub *nats.Subscription) {
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
m, _, err := sub.Pending()
|
||||
if err != nil {
|
||||
t.Fatalf("Error calling Pending(): %v", err)
|
||||
}
|
||||
if m > 1 {
|
||||
t.Fatalf("Received more then one response, cycle indicated: %d", m)
|
||||
}
|
||||
}
|
||||
|
||||
verifyRequestTotal := func(nre int32) {
|
||||
if nr := atomic.LoadInt32(&requestsReceived); nr != nre {
|
||||
t.Fatalf("Expected %d requests received, got %d", nre, nr)
|
||||
}
|
||||
}
|
||||
|
||||
// This should pass to here, but if we have a cycle things will be spinning and we will receive
|
||||
// too many responses when it should only be 1.
|
||||
nc, rsub := createClientAndRequest(sc)
|
||||
defer nc.Close()
|
||||
verifyOneResponse(rsub)
|
||||
verifyRequestTotal(1)
|
||||
|
||||
// Do a solicit across GW, so shut this one down.
|
||||
nc.Close()
|
||||
shutdownCluster(sc)
|
||||
|
||||
// Soliciting cluster, connect to different clusters across a GW.
|
||||
sc = runSolicitLeafCluster(t, "SC", ca, cb)
|
||||
defer shutdownCluster(sc)
|
||||
|
||||
nc, rsub = createClientAndRequest(sc)
|
||||
defer nc.Close()
|
||||
verifyOneResponse(rsub)
|
||||
verifyRequestTotal(2) // This is total since use same responders.
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user