From 08ca7b5f2ae451b0de956c7db25761375515905d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 6 Dec 2018 17:52:17 -0800 Subject: [PATCH] ServiceImports and queue groups Signed-off-by: Derek Collison --- server/client.go | 5 +++ server/route.go | 12 +++++++ test/new_routes_test.go | 73 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/server/client.go b/server/client.go index 5a2163ad..ff7771c3 100644 --- a/server/client.go +++ b/server/client.go @@ -2185,6 +2185,11 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { } // FIXME(dlc) - Do L1 cache trick from above. rr := rm.acc.sl.Match(rm.to) + // If we are a route or gateway and this message is flipped to a queue subscriber we + // need to handle that since the processMsgResults will want a queue filter. + if c.kind == ROUTER || c.kind == GATEWAY && c.pa.queues == nil && len(rr.qsubs) > 0 { + c.makeQFilter(rr.qsubs) + } c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, nil) // If this is not a gateway connection but gateway is enabled, // try to send this converted message to all gateways. diff --git a/server/route.go b/server/route.go index b87b68de..8a0e78bb 100644 --- a/server/route.go +++ b/server/route.go @@ -311,6 +311,18 @@ func (c *client) processInboundRoutedMsg(msg []byte) { c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, nil) } +// Helper function for routes and gateways to create qfilters need for +// converted subs from imports, etc. +func (c *client) makeQFilter(qsubs [][]*subscription) { + qs := make([][]byte, 0, len(qsubs)) + for _, qsub := range qsubs { + if len(qsub) > 0 { + qs = append(qs, qsub[0].queue) + } + } + c.pa.queues = qs +} + // Lock should be held entering here. func (c *client) sendConnect(tlsRequired bool) { var user, pass string diff --git a/test/new_routes_test.go b/test/new_routes_test.go index 06c3da72..ed1f3241 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -1236,6 +1236,79 @@ func TestNewRouteServiceImport(t *testing.T) { } } +func TestNewRouteServiceImportQueueGroups(t *testing.T) { + srvA, srvB, optsA, optsB := runServers(t) + defer srvA.Shutdown() + defer srvB.Shutdown() + + // Do Accounts for the servers. + fooA, barA := registerAccounts(t, srvA) + fooB, barB := registerAccounts(t, srvB) + + // Add export to both. + addServiceExport("test.request", isPublic, fooA, fooB) + + // Add import abilities to server B's bar account from foo. + if err := barB.AddServiceImport(fooB, "foo.request", "test.request"); err != nil { + t.Fatalf("Error adding service import: %v", err) + } + // Do same on A. + if err := barA.AddServiceImport(fooA, "foo.request", "test.request"); err != nil { + t.Fatalf("Error adding service import: %v", err) + } + + // clientA will be connected to srvA and be the service endpoint and responder. + clientA := createClientConn(t, optsA.Host, optsA.Port) + defer clientA.Close() + + sendA, expectA := setupConnWithAccount(t, clientA, "$foo") + sendA("SUB test.request QGROUP 1\r\nPING\r\n") + expectA(pongRe) + + // Now setup client B on srvB who will do a sub from account $bar + // that should map account $foo's foo subject. + clientB := createClientConn(t, optsB.Host, optsB.Port) + defer clientB.Close() + + sendB, expectB := setupConnWithAccount(t, clientB, "$bar") + sendB("SUB reply QGROUP_TOO 1\r\nPING\r\n") + expectB(pongRe) + + // Send the request from clientB on foo.request, + sendB("PUB foo.request reply 2\r\nhi\r\nPING\r\n") + expectB(pongRe) + + expectMsgsA := expectMsgsCommand(t, expectA) + expectMsgsB := expectMsgsCommand(t, expectB) + + // Expect the request on A + matches := expectMsgsA(1) + reply := string(matches[0][replyIndex]) + checkMsg(t, matches[0], "test.request", "1", reply, "2", "hi") + if reply == "reply" { + t.Fatalf("Expected randomized reply, but got original") + } + + sendA(fmt.Sprintf("PUB %s 2\r\nok\r\nPING\r\n", reply)) + expectA(pongRe) + + matches = expectMsgsB(1) + checkMsg(t, matches[0], "reply", "1", "", "2", "ok") + + if ts := fooA.TotalSubs(); ts != 1 { + t.Fatalf("Expected one sub to be left on fooA, but got %d", ts) + } + + routez, _ := srvA.Routez(&server.RoutezOptions{Subscriptions: true}) + r := routez.Routes[0] + if r == nil { + t.Fatalf("Expected 1 route, got none") + } + if r.NumSubs != 1 { + t.Fatalf("Expected 1 sub in the route connection, got %v", r.NumSubs) + } +} + func TestNewRouteServiceImportDanglingRemoteSubs(t *testing.T) { srvA, srvB, optsA, optsB := runServers(t) defer srvA.Shutdown()