diff --git a/server/auth.go b/server/auth.go index 37ff2e98..bb710074 100644 --- a/server/auth.go +++ b/server/auth.go @@ -644,6 +644,9 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool { return true } + // FIXME(dlc) - Add ability to support remote account bindings via + // other auth like user or nkey and tlsMapping. + // For now this means we are binding the leafnode to the global account. c.registerWithAccount(s.globalAccount()) diff --git a/server/client.go b/server/client.go index 60f0686b..8fe855ce 100644 --- a/server/client.go +++ b/server/client.go @@ -2355,9 +2355,9 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { // FIXME(dlc) - Do L1 cache trick from above. rr := rm.acc.sl.Match(rm.to) - // If we are a route or gateway and this message is flipped to a queue subscriber we + // If we are a route or gateway or leafnode and this message is flipped to a queue subscriber we // need to handle that since the processMsgResults will want a queue filter. - if (c.kind == ROUTER || c.kind == GATEWAY) && c.pa.queues == nil && len(rr.qsubs) > 0 { + if (c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF) && c.pa.queues == nil && len(rr.qsubs) > 0 { c.makeQFilter(rr.qsubs) } diff --git a/server/const.go b/server/const.go index 8d70fcf2..a14eecdb 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.0.0-RC17" + VERSION = "2.0.0-RC18" // PROTO is the currently supported protocol. // 0 was the original diff --git a/test/leafnode_test.go b/test/leafnode_test.go index bc06bd7e..d92db636 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -17,6 +17,7 @@ import ( "crypto/tls" "encoding/json" "fmt" + "math/rand" "net" "net/url" "os" @@ -58,10 +59,14 @@ func runLeafServerOnPort(port int) (*server.Server, *server.Options) { } func runSolicitLeafServer(lso *server.Options) (*server.Server, *server.Options) { + return runSolicitLeafServerToURL(fmt.Sprintf("nats-leaf://%s:%d", lso.LeafNode.Host, lso.LeafNode.Port)) +} + +func runSolicitLeafServerToURL(surl string) (*server.Server, *server.Options) { o := DefaultTestOptions o.Host = "127.0.0.1" o.Port = -1 - rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lso.LeafNode.Host, lso.LeafNode.Port)) + rurl, _ := url.Parse(surl) o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URL: rurl}} o.LeafNode.ReconnectInterval = 100 * time.Millisecond return RunServer(&o), &o @@ -554,6 +559,7 @@ func waitForOutboundGateways(t *testing.T, s *server.Server, expected int, timeo // Creates a full cluster with numServers and given name and makes sure its well formed. // Will have Gateways and Leaf Node connections active. func createClusterWithName(t *testing.T, clusterName string, numServers int, connectTo ...*cluster) *cluster { + t.Helper() return createClusterEx(t, false, clusterName, numServers, connectTo...) } @@ -2144,3 +2150,92 @@ func TestLeafNodeSendsAccountingEvents(t *testing.T) { t.Fatal("Did not get correctly formatted event") } } + +func TestLeadNodeDistributedQueueAcrossGWs(t *testing.T) { + server.SetGatewaysSolicitDelay(10 * time.Millisecond) + defer server.ResetGatewaysSolicitDelay() + + ca := createClusterEx(t, true, "A", 3) + defer shutdownCluster(ca) + cb := createClusterEx(t, true, "B", 3, ca) + defer shutdownCluster(cb) + cc := createClusterEx(t, true, "C", 3, ca, cb) + defer shutdownCluster(cc) + + // Create queue subscribers + createQS := func(c *cluster) *nats.Conn { + t.Helper() + opts := c.opts[rand.Intn(len(c.opts))] + url := fmt.Sprintf("nats://ngs:pass@%s:%d", opts.Host, opts.Port) + nc, err := nats.Connect(url) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + nc.QueueSubscribe("ngs.usage.*", "dq", func(m *nats.Msg) { + m.Respond([]byte(c.name)) + }) + nc.Flush() + return nc + } + + ncA := createQS(ca) + defer ncA.Close() + ncB := createQS(cb) + defer ncB.Close() + ncC := createQS(cc) + defer ncC.Close() + + connectAndRequest := func(url, clusterName string, nreqs int) { + t.Helper() + nc, err := nats.Connect(url) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + for i := 0; i < nreqs; i++ { + m, err := nc.Request("ngs.usage", nil, 500*time.Millisecond) + if err != nil { + t.Fatalf("Did not receive a response: %v", err) + } + if string(m.Data) != clusterName { + t.Fatalf("Expected to prefer %q, but got response from %q", clusterName, m.Data) + } + } + } + + checkClientDQ := func(c *cluster, nreqs int) { + t.Helper() + // Pick one at random. + opts := c.opts[rand.Intn(len(c.opts))] + url := fmt.Sprintf("nats://dlc:pass@%s:%d", opts.Host, opts.Port) + connectAndRequest(url, c.name, nreqs) + } + + // First check that this works with direct connected clients. + checkClientDQ(ca, 100) + checkClientDQ(cb, 100) + checkClientDQ(cc, 100) + + createLNS := func(c *cluster) (*server.Server, *server.Options) { + t.Helper() + // Pick one at random. + s, opts := runSolicitLeafServer(c.opts[rand.Intn(len(c.servers))]) + checkLeafNodeConnected(t, s) + return s, opts + } + + checkLeafDQ := func(opts *server.Options, clusterName string, nreqs int) { + t.Helper() + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + connectAndRequest(url, clusterName, nreqs) + } + + // Test leafnodes to all clusters. + for _, c := range []*cluster{ca, cb, cc} { + // Now create a leafnode on cluster. + sl, slOpts := createLNS(c) + defer sl.Shutdown() + // Now connect to the leafnode server and run test. + checkLeafDQ(slOpts, c.name, 100) + } +}