mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
System account was not properly tracking GW routed replies
In some cases, the reply of a request message is prefixed when going over a gateway so that if it comes back to a different server than when the request originates, it can be routed back. For system accounts, this routed reply subject was not tracked so the server would reply to the inbox and may reach a server that had not yet processed (through the route) the interest on that inbox. If the reply came with the GW routed info, that server would know to route it to the original server. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -2951,6 +2951,13 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g
|
||||
|
||||
// Check for internal subscriptions.
|
||||
if sub.icb != nil {
|
||||
if gwrply {
|
||||
// Note that we keep track of the GW routed reply in the destination
|
||||
// connection (`client`). The routed reply subject is in `c.pa.reply`,
|
||||
// should that change, we would have to pass the GW routed reply as
|
||||
// a parameter of deliverMsg().
|
||||
srv.trackGWReply(client, c.pa.reply)
|
||||
}
|
||||
client.mu.Unlock()
|
||||
// Internal account clients are for service imports and need the '\r\n'.
|
||||
if client.kind == ACCOUNT {
|
||||
@@ -2988,9 +2995,10 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g
|
||||
// header (which is c.pa.reply without the GNR routing prefix).
|
||||
if client.kind == CLIENT && len(c.pa.reply) > minReplyLen {
|
||||
if gwrply {
|
||||
// Note we keep track "in" the destination client (`client`) but the
|
||||
// routed reply subject is in `c.pa.reply`. Should that change, we
|
||||
// would have to pass the "reply" in deliverMsg().
|
||||
// Note that we keep track of the GW routed reply in the destination
|
||||
// connection (`client`). The routed reply subject is in `c.pa.reply`,
|
||||
// should that change, we would have to pass the GW routed reply as
|
||||
// a parameter of deliverMsg().
|
||||
srv.trackGWReply(client, c.pa.reply)
|
||||
}
|
||||
|
||||
|
||||
@@ -155,7 +155,9 @@ func runTrustedGateways(t *testing.T) (*Server, *Options, *Server, *Options, nke
|
||||
|
||||
sb := RunServer(optsB)
|
||||
|
||||
waitForInboundGateways(t, sa, 1, time.Second)
|
||||
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||
waitForInboundGateways(t, sb, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb, 1, time.Second)
|
||||
|
||||
return sa, optsA, sb, optsB, akp
|
||||
|
||||
Reference in New Issue
Block a user