diff --git a/server/client.go b/server/client.go index 31aa5849..bc47664a 100644 --- a/server/client.go +++ b/server/client.go @@ -2426,8 +2426,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, msgh = append(msgh, ' ') si := len(msgh) - // For sending messages across routes. Reset it if we have one. - // We reuse this data structure. + // For sending messages across routes and leafnodes. + // Reset if we have one since we reuse this data structure. if c.in.rts != nil { c.in.rts = c.in.rts[:0] } @@ -2438,10 +2438,9 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // these after everything else. switch sub.client.kind { case ROUTER: - if c.kind == ROUTER { - continue + if c.kind != ROUTER && !c.isSolicitedLeafNode() { + c.addSubToRouteTargets(sub) } - c.addSubToRouteTargets(sub) continue case GATEWAY: // Never send to gateway from here. @@ -2451,7 +2450,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // Leaf node delivery audience is different however. // Also leaf nodes are always no echo, so we make sure we are not // going to send back to ourselves here. - if c != sub.client { + if c != sub.client && (c.kind != ROUTER || !c.isSolicitedLeafNode()) { c.addSubToRouteTargets(sub) } continue @@ -2576,7 +2575,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, sendToRoutesOrLeafs: - // If no messages for routes return here. + // If no messages for routes or leafnodes return here. if len(c.in.rts) == 0 { return queues } diff --git a/server/leafnode.go b/server/leafnode.go index b4c219f1..ac34f059 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -52,6 +52,7 @@ type leafNodeCfg struct { curURL *url.URL } +// Check to see if this is a solicited leafnode. We do special processing for solicited. func (c *client) isSolicitedLeafNode() bool { return c.kind == LEAF && c.leaf.remote != nil } @@ -627,7 +628,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { c.registerWithAccount(c.acc) s.addLeafNodeConnection(c) s.initLeafNodeSmap(c) - c.sendAllAccountSubs() + c.sendAllLeafSubs() } return c @@ -797,7 +798,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro // We are good to go, send over all the bound account subscriptions. s.startGoRoutine(func() { - c.sendAllAccountSubs() + c.sendAllLeafSubs() s.grWG.Done() }) @@ -825,7 +826,12 @@ func (s *Server) initLeafNodeSmap(c *client) { ims := []string{} acc.mu.RLock() accName := acc.Name - acc.sl.All(&subs) + // If we are solicited we only send interest for local clients. + if c.isSolicitedLeafNode() { + acc.sl.localSubs(&subs) + } else { + acc.sl.All(&subs) + } // Since leaf nodes only send on interest, if the bound // account has import services we need to send those over. for isubj := range acc.imports.services { @@ -894,7 +900,7 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) { _l := [32]*client{} leafs := _l[:0] - // Grab all leaf nodes. Ignore leafnode if sub's client is a leafnode and matches. + // Grab all leaf nodes. Ignore a leafnode if sub's client is a leafnode and matches. acc.mu.RLock() for _, ln := range acc.lleafs { if ln != sub.client { @@ -909,11 +915,18 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) { } // This will make an update to our internal smap and determine if we should send out -// and interest update to the remote side. +// an interest update to the remote side. func (c *client) updateSmap(sub *subscription, delta int32) { key := keyFromSub(sub) c.mu.Lock() + + // If we are solicited make sure this is a local client. + if c.isSolicitedLeafNode() && sub.client.kind != CLIENT { + c.mu.Unlock() + return + } + n := c.leaf.smap[key] // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0. update := sub.queue != nil || n == 0 || n+delta <= 0 @@ -956,8 +969,8 @@ func keyFromSub(sub *subscription) string { } // Send all subscriptions for this account that include local -// and all subscriptions besides our own. -func (c *client) sendAllAccountSubs() { +// and possibly all other remote subscriptions. +func (c *client) sendAllLeafSubs() { // Hold all at once for now. var b bytes.Buffer @@ -1084,23 +1097,25 @@ func (c *client) processLeafSub(argo []byte) (err error) { atomic.StoreInt32(&osub.qw, sub.qw) acc.sl.UpdateRemoteQSub(osub) } - + solicited := c.isSolicitedLeafNode() c.mu.Unlock() - // Treat leaf node subscriptions similar to a client subscription, meaning we - // send them to both routes and gateways and other leaf nodes. We also do - // the shadow subscriptions. if err := c.addShadowSubscriptions(acc, sub); err != nil { c.Errorf(err.Error()) } - // If we are routing add to the route map for the associated account. - srv.updateRouteSubscriptionMap(acc, sub, 1) - if updateGWs { - srv.gatewayUpdateSubInterest(acc.Name, sub, 1) - } - // Now check on leafnode updates for other leaf nodes. - srv.updateLeafNodes(acc, sub, 1) + // If we are not solicited, treat leaf node subscriptions similar to a + // client subscription, meaning we forward them to routes, gateways and + // other leaf nodes as needed. + if !solicited { + // If we are routing add to the route map for the associated account. + srv.updateRouteSubscriptionMap(acc, sub, 1) + if updateGWs { + srv.gatewayUpdateSubInterest(acc.Name, sub, 1) + } + // Now check on leafnode updates for other leaf nodes. + srv.updateLeafNodes(acc, sub, 1) + } return nil } diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 2121d29e..e21ee579 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -40,7 +40,6 @@ func createLeafConn(t tLogger, host string, port int) net.Conn { func testDefaultOptionsForLeafNodes() *server.Options { o := DefaultTestOptions - o.Host = "127.0.0.1" o.Port = -1 o.LeafNode.Host = o.Host o.LeafNode.Port = -1 @@ -64,7 +63,6 @@ func runSolicitLeafServer(lso *server.Options) (*server.Server, *server.Options) func runSolicitLeafServerToURL(surl string) (*server.Server, *server.Options) { o := DefaultTestOptions - o.Host = "127.0.0.1" o.Port = -1 rurl, _ := url.Parse(surl) o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}} @@ -419,9 +417,11 @@ func TestLeafNodeAndRoutes(t *testing.T) { expect(pongRe) expectNothing(t, lc) - send("UNSUB 2\r\n") + send("UNSUB 2\r\nPING\r\n") + expect(pongRe) expectNothing(t, lc) - send("UNSUB 1\r\n") + send("UNSUB 1\r\nPING\r\n") + expect(pongRe) leafExpect(lunsubRe) // Now put it back and test msg flow. @@ -522,7 +522,6 @@ type cluster struct { func testDefaultClusterOptionsForLeafNodes() *server.Options { o := DefaultTestOptions - o.Host = "127.0.0.1" o.Port = -1 o.Cluster.Host = o.Host o.Cluster.Port = -1 @@ -928,7 +927,6 @@ func TestLeafNodeBasicAuth(t *testing.T) { func runTLSSolicitLeafServer(lso *server.Options) (*server.Server, *server.Options) { o := DefaultTestOptions - o.Host = "127.0.0.1" o.Port = -1 rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lso.LeafNode.Host, lso.LeafNode.Port)) remote := &server.RemoteLeafOpts{URLs: []*url.URL{rurl}} @@ -2388,7 +2386,6 @@ func TestLeafNodeAndGatewayGlobalRouting(t *testing.T) { defer ncl.Close() ncl.Subscribe("foo", func(m *nats.Msg) { - fmt.Printf("Reply is %v\n", m.Reply) m.Respond([]byte("World")) }) @@ -2482,3 +2479,156 @@ func TestLeafNodeMultipleRemoteURLs(t *testing.T) { checkLeafNodeConnected(t, s) } + +func runSolicitLeafCluster(t *testing.T, clusterName string, d1, d2 *cluster) *cluster { + c := &cluster{servers: make([]*server.Server, 0, 2), opts: make([]*server.Options, 0, 2), name: clusterName} + + // Who we will solicit for server 1 + ci := rand.Intn(len(d1.opts)) + opts := d1.opts[ci] + surl := fmt.Sprintf("nats-leaf://%s:%d", opts.LeafNode.Host, opts.LeafNode.Port) + + o := DefaultTestOptions + o.Port = -1 + rurl, _ := url.Parse(surl) + o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}} + o.LeafNode.ReconnectInterval = 100 * time.Millisecond + o.Cluster.Host = o.Host + o.Cluster.Port = -1 + s := RunServer(&o) + checkLeafNodeConnected(t, d1.servers[ci]) + + c.servers = append(c.servers, s) + c.opts = append(c.opts, &o) + + // Grab route info + routeAddr := fmt.Sprintf("nats-route://%s:%d", o.Cluster.Host, o.Cluster.Port) + curl, _ := url.Parse(routeAddr) + + // Who we will solicit for server 2 + ci = rand.Intn(len(d2.opts)) + opts = d2.opts[ci] + surl = fmt.Sprintf("nats-leaf://%s:%d", opts.LeafNode.Host, opts.LeafNode.Port) + + // This is for the case were d1 == d2 and we select the same server. + plfn := d2.servers[ci].NumLeafNodes() + + o2 := DefaultTestOptions + o2.Port = -1 + rurl, _ = url.Parse(surl) + o2.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}} + o2.LeafNode.ReconnectInterval = 100 * time.Millisecond + o2.Cluster.Host = o.Host + o2.Cluster.Port = -1 + o2.Routes = []*url.URL{curl} + s = RunServer(&o2) + + if plfn == 0 { + checkLeafNodeConnected(t, d2.servers[ci]) + } else { + checkLeafNode2Connected(t, d2.servers[ci]) + } + + c.servers = append(c.servers, s) + c.opts = append(c.opts, &o2) + + checkClusterFormed(t, c.servers...) + + return c +} + +func clientForCluster(t *testing.T, c *cluster) *nats.Conn { + t.Helper() + opts := c.opts[rand.Intn(len(c.opts))] + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + nc, err := nats.Connect(url) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + return nc +} + +func TestLeafNodeCycleWithSolicited(t *testing.T) { + server.SetGatewaysSolicitDelay(10 * time.Millisecond) + defer server.ResetGatewaysSolicitDelay() + + // Accepting leafnode cluster, e.g. NGS + ca := createClusterWithName(t, "A", 3) + defer shutdownCluster(ca) + cb := createClusterWithName(t, "B", 3, ca) + defer shutdownCluster(cb) + + // Create the responders. + requestsReceived := int32(0) + + nc := clientForCluster(t, ca) + defer nc.Close() + nc.QueueSubscribe("request", "cycles", func(m *nats.Msg) { + atomic.AddInt32(&requestsReceived, 1) + m.Respond([]byte("22")) + }) + nc.Flush() + + nc = clientForCluster(t, cb) + defer nc.Close() + nc.QueueSubscribe("request", "cycles", func(m *nats.Msg) { + atomic.AddInt32(&requestsReceived, 1) + m.Respond([]byte("33")) + }) + nc.Flush() + + // Soliciting cluster, both solicited connected to the "A" cluster + sc := runSolicitLeafCluster(t, "SC", ca, ca) + defer shutdownCluster(sc) + + // Connect a client to a random server in sc + createClientAndRequest := func(c *cluster) (*nats.Conn, *nats.Subscription) { + nc := clientForCluster(t, c) + reply := nats.NewInbox() + sub, err := nc.SubscribeSync(reply) + if err != nil { + t.Fatalf("Could not subscribe: %v", err) + } + if err := nc.PublishRequest("request", reply, []byte("fingers crossed")); err != nil { + t.Fatalf("Error sending request: %v", err) + } + return nc, sub + } + + verifyOneResponse := func(sub *nats.Subscription) { + time.Sleep(250 * time.Millisecond) + m, _, err := sub.Pending() + if err != nil { + t.Fatalf("Error calling Pending(): %v", err) + } + if m > 1 { + t.Fatalf("Received more then one response, cycle indicated: %d", m) + } + } + + verifyRequestTotal := func(nre int32) { + if nr := atomic.LoadInt32(&requestsReceived); nr != nre { + t.Fatalf("Expected %d requests received, got %d", nre, nr) + } + } + + // This should pass to here, but if we have a cycle things will be spinning and we will receive + // too many responses when it should only be 1. + nc, rsub := createClientAndRequest(sc) + defer nc.Close() + verifyOneResponse(rsub) + verifyRequestTotal(1) + + // Do a solicit across GW, so shut this one down. + nc.Close() + shutdownCluster(sc) + + // Soliciting cluster, connect to different clusters across a GW. + sc = runSolicitLeafCluster(t, "SC", ca, cb) + defer shutdownCluster(sc) + + nc, rsub = createClientAndRequest(sc) + defer nc.Close() + verifyOneResponse(rsub) + verifyRequestTotal(2) // This is total since use same responders. +}