From 57403b1903f5c4b29ed45e1e18416da0490cf021 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 24 Feb 2021 19:28:46 -0800 Subject: [PATCH] When pulling consumer messages if the consumer and the client were on the same server we needed to process local deliver subject. Signed-off-by: Derek Collison --- server/client.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/server/client.go b/server/client.go index 3139789f..c5fb39b8 100644 --- a/server/client.go +++ b/server/client.go @@ -3742,12 +3742,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt } // Pick correct to subject. If we matched on a wildcard use the literal publish subject. - to := si.to + to, subject := si.to, string(c.pa.subject) if si.tr != nil { // FIXME(dlc) - This could be slow, may want to look at adding cache to bare transforms? - to, _ = si.tr.transformSubject(string(c.pa.subject)) + to, _ = si.tr.transformSubject(subject) } else if si.usePub { - to = string(c.pa.subject) + to = subject } // Now check to see if this account has mappings that could affect the service import. // Can't use non-locked trick like in processInboundClientMsg, so just call into selectMappedSubject @@ -3814,15 +3814,21 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt var didDeliver bool + // Remap to local deliver subject if set. + dsubj := []byte(to) + if len(c.pa.deliver) > 0 { + dsubj = c.pa.deliver + } + // If this is not a gateway connection but gateway is enabled, // try to send this converted message to all gateways. if c.srv.gateway.enabled { flags |= pmrCollectQueueNames var queues [][]byte - didDeliver, queues = c.processMsgResults(si.acc, rr, msg, nil, []byte(to), nrr, flags) - didDeliver = c.sendMsgToGateways(si.acc, msg, []byte(to), nrr, queues) || didDeliver + didDeliver, queues = c.processMsgResults(si.acc, rr, msg, nil, dsubj, nrr, flags) + didDeliver = c.sendMsgToGateways(si.acc, msg, dsubj, nrr, queues) || didDeliver } else { - didDeliver, _ = c.processMsgResults(si.acc, rr, msg, nil, []byte(to), nrr, flags) + didDeliver, _ = c.processMsgResults(si.acc, rr, msg, nil, dsubj, nrr, flags) } // Put what was there back now.