Fix for leafnode and dq selection over GWs

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-06-01 13:00:35 -07:00
parent 9bb357e32a
commit 2a8e630bf1
4 changed files with 102 additions and 4 deletions

View File

@@ -644,6 +644,9 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool {
return true
}
// FIXME(dlc) - Add ability to support remote account bindings via
// other auth like user or nkey and tlsMapping.
// For now this means we are binding the leafnode to the global account.
c.registerWithAccount(s.globalAccount())

View File

@@ -2355,9 +2355,9 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
// FIXME(dlc) - Do L1 cache trick from above.
rr := rm.acc.sl.Match(rm.to)
// If we are a route or gateway and this message is flipped to a queue subscriber we
// If we are a route or gateway or leafnode and this message is flipped to a queue subscriber we
// need to handle that since the processMsgResults will want a queue filter.
if (c.kind == ROUTER || c.kind == GATEWAY) && c.pa.queues == nil && len(rr.qsubs) > 0 {
if (c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF) && c.pa.queues == nil && len(rr.qsubs) > 0 {
c.makeQFilter(rr.qsubs)
}

View File

@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.0.0-RC17"
VERSION = "2.0.0-RC18"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -17,6 +17,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"math/rand"
"net"
"net/url"
"os"
@@ -58,10 +59,14 @@ func runLeafServerOnPort(port int) (*server.Server, *server.Options) {
}
func runSolicitLeafServer(lso *server.Options) (*server.Server, *server.Options) {
return runSolicitLeafServerToURL(fmt.Sprintf("nats-leaf://%s:%d", lso.LeafNode.Host, lso.LeafNode.Port))
}
func runSolicitLeafServerToURL(surl string) (*server.Server, *server.Options) {
o := DefaultTestOptions
o.Host = "127.0.0.1"
o.Port = -1
rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lso.LeafNode.Host, lso.LeafNode.Port))
rurl, _ := url.Parse(surl)
o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URL: rurl}}
o.LeafNode.ReconnectInterval = 100 * time.Millisecond
return RunServer(&o), &o
@@ -554,6 +559,7 @@ func waitForOutboundGateways(t *testing.T, s *server.Server, expected int, timeo
// Creates a full cluster with numServers and given name and makes sure its well formed.
// Will have Gateways and Leaf Node connections active.
func createClusterWithName(t *testing.T, clusterName string, numServers int, connectTo ...*cluster) *cluster {
t.Helper()
return createClusterEx(t, false, clusterName, numServers, connectTo...)
}
@@ -2144,3 +2150,92 @@ func TestLeafNodeSendsAccountingEvents(t *testing.T) {
t.Fatal("Did not get correctly formatted event")
}
}
func TestLeadNodeDistributedQueueAcrossGWs(t *testing.T) {
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
defer server.ResetGatewaysSolicitDelay()
ca := createClusterEx(t, true, "A", 3)
defer shutdownCluster(ca)
cb := createClusterEx(t, true, "B", 3, ca)
defer shutdownCluster(cb)
cc := createClusterEx(t, true, "C", 3, ca, cb)
defer shutdownCluster(cc)
// Create queue subscribers
createQS := func(c *cluster) *nats.Conn {
t.Helper()
opts := c.opts[rand.Intn(len(c.opts))]
url := fmt.Sprintf("nats://ngs:pass@%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
nc.QueueSubscribe("ngs.usage.*", "dq", func(m *nats.Msg) {
m.Respond([]byte(c.name))
})
nc.Flush()
return nc
}
ncA := createQS(ca)
defer ncA.Close()
ncB := createQS(cb)
defer ncB.Close()
ncC := createQS(cc)
defer ncC.Close()
connectAndRequest := func(url, clusterName string, nreqs int) {
t.Helper()
nc, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
for i := 0; i < nreqs; i++ {
m, err := nc.Request("ngs.usage", nil, 500*time.Millisecond)
if err != nil {
t.Fatalf("Did not receive a response: %v", err)
}
if string(m.Data) != clusterName {
t.Fatalf("Expected to prefer %q, but got response from %q", clusterName, m.Data)
}
}
}
checkClientDQ := func(c *cluster, nreqs int) {
t.Helper()
// Pick one at random.
opts := c.opts[rand.Intn(len(c.opts))]
url := fmt.Sprintf("nats://dlc:pass@%s:%d", opts.Host, opts.Port)
connectAndRequest(url, c.name, nreqs)
}
// First check that this works with direct connected clients.
checkClientDQ(ca, 100)
checkClientDQ(cb, 100)
checkClientDQ(cc, 100)
createLNS := func(c *cluster) (*server.Server, *server.Options) {
t.Helper()
// Pick one at random.
s, opts := runSolicitLeafServer(c.opts[rand.Intn(len(c.servers))])
checkLeafNodeConnected(t, s)
return s, opts
}
checkLeafDQ := func(opts *server.Options, clusterName string, nreqs int) {
t.Helper()
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
connectAndRequest(url, clusterName, nreqs)
}
// Test leafnodes to all clusters.
for _, c := range []*cluster{ca, cb, cc} {
// Now create a leafnode on cluster.
sl, slOpts := createLNS(c)
defer sl.Shutdown()
// Now connect to the leafnode server and run test.
checkLeafDQ(slOpts, c.name, 100)
}
}