Merge pull request #847 from nats-io/si

ServiceImports and queue groups
This commit is contained in:
Derek Collison
2018-12-06 18:11:23 -08:00
committed by GitHub
3 changed files with 90 additions and 0 deletions

View File

@@ -2185,6 +2185,11 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
}
// FIXME(dlc) - Do L1 cache trick from above.
rr := rm.acc.sl.Match(rm.to)
// If we are a route or gateway and this message is flipped to a queue subscriber we
// need to handle that since the processMsgResults will want a queue filter.
if c.kind == ROUTER || c.kind == GATEWAY && c.pa.queues == nil && len(rr.qsubs) > 0 {
c.makeQFilter(rr.qsubs)
}
c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, nil)
// If this is not a gateway connection but gateway is enabled,
// try to send this converted message to all gateways.

View File

@@ -311,6 +311,18 @@ func (c *client) processInboundRoutedMsg(msg []byte) {
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, nil)
}
// Helper function for routes and gateways to create qfilters need for
// converted subs from imports, etc.
func (c *client) makeQFilter(qsubs [][]*subscription) {
qs := make([][]byte, 0, len(qsubs))
for _, qsub := range qsubs {
if len(qsub) > 0 {
qs = append(qs, qsub[0].queue)
}
}
c.pa.queues = qs
}
// Lock should be held entering here.
func (c *client) sendConnect(tlsRequired bool) {
var user, pass string

View File

@@ -1236,6 +1236,79 @@ func TestNewRouteServiceImport(t *testing.T) {
}
}
func TestNewRouteServiceImportQueueGroups(t *testing.T) {
srvA, srvB, optsA, optsB := runServers(t)
defer srvA.Shutdown()
defer srvB.Shutdown()
// Do Accounts for the servers.
fooA, barA := registerAccounts(t, srvA)
fooB, barB := registerAccounts(t, srvB)
// Add export to both.
addServiceExport("test.request", isPublic, fooA, fooB)
// Add import abilities to server B's bar account from foo.
if err := barB.AddServiceImport(fooB, "foo.request", "test.request"); err != nil {
t.Fatalf("Error adding service import: %v", err)
}
// Do same on A.
if err := barA.AddServiceImport(fooA, "foo.request", "test.request"); err != nil {
t.Fatalf("Error adding service import: %v", err)
}
// clientA will be connected to srvA and be the service endpoint and responder.
clientA := createClientConn(t, optsA.Host, optsA.Port)
defer clientA.Close()
sendA, expectA := setupConnWithAccount(t, clientA, "$foo")
sendA("SUB test.request QGROUP 1\r\nPING\r\n")
expectA(pongRe)
// Now setup client B on srvB who will do a sub from account $bar
// that should map account $foo's foo subject.
clientB := createClientConn(t, optsB.Host, optsB.Port)
defer clientB.Close()
sendB, expectB := setupConnWithAccount(t, clientB, "$bar")
sendB("SUB reply QGROUP_TOO 1\r\nPING\r\n")
expectB(pongRe)
// Send the request from clientB on foo.request,
sendB("PUB foo.request reply 2\r\nhi\r\nPING\r\n")
expectB(pongRe)
expectMsgsA := expectMsgsCommand(t, expectA)
expectMsgsB := expectMsgsCommand(t, expectB)
// Expect the request on A
matches := expectMsgsA(1)
reply := string(matches[0][replyIndex])
checkMsg(t, matches[0], "test.request", "1", reply, "2", "hi")
if reply == "reply" {
t.Fatalf("Expected randomized reply, but got original")
}
sendA(fmt.Sprintf("PUB %s 2\r\nok\r\nPING\r\n", reply))
expectA(pongRe)
matches = expectMsgsB(1)
checkMsg(t, matches[0], "reply", "1", "", "2", "ok")
if ts := fooA.TotalSubs(); ts != 1 {
t.Fatalf("Expected one sub to be left on fooA, but got %d", ts)
}
routez, _ := srvA.Routez(&server.RoutezOptions{Subscriptions: true})
r := routez.Routes[0]
if r == nil {
t.Fatalf("Expected 1 route, got none")
}
if r.NumSubs != 1 {
t.Fatalf("Expected 1 sub in the route connection, got %v", r.NumSubs)
}
}
func TestNewRouteServiceImportDanglingRemoteSubs(t *testing.T) {
srvA, srvB, optsA, optsB := runServers(t)
defer srvA.Shutdown()