From 9d1e773e8f8fce589ea0ff4dd51e3ad697000bb9 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 29 Aug 2022 14:05:51 -0600 Subject: [PATCH] [FIXED] Gateway: system request/replies may not work properly When a subscription is recently made, gateway code ensures that if there is a reply subject, the reply is "mapped" or rewritten to allow the reply to come back to the origin cluster, regardless of subscription interest propagation. The issue was that this uses a map with a `*client` as the key but the pointer for SYSTEM clients would not always be the same, which meant that the rewrite would not happen, causing possible "loss" of replies. Signed-off-by: Ivan Kozlovic --- server/gateway.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server/gateway.go b/server/gateway.go index 2a7de885..4cab3303 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -164,6 +164,13 @@ type srvGateway struct { // This is to track recent subscriptions for a given connection rsubs sync.Map + // This client will be used for SYSTEM clients when storing/looking up + // recent subscriptions in rsubs. This is because some code may not + // use the same actual *client object for SYSTEM client. For instance + // a raft node creates an internal client, that would be used to store + // the subscription in rsubs, but the sending part that checks for rsubs + // is using an internal client created in sendq.go's internalLoop. + sysCli *client resolver netResolver // Used to resolve host name before calling net.Dial() sqbsz int // Max buffer size to send queue subs protocol. Used for testing. @@ -359,6 +366,7 @@ func (s *Server) newGateway(opts *Options) error { resolver: opts.Gateway.resolver, runknown: opts.Gateway.RejectUnknown, oldHash: getOldHash(opts.Gateway.Name), + sysCli: &client{}, } gateway.Lock() defer gateway.Unlock() @@ -2360,6 +2368,9 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha if sub.client != nil { rsubs := &s.gateway.rsubs c := sub.client + if c.kind == SYSTEM { + c = s.gateway.sysCli + } sli, _ := rsubs.Load(c) if change > 0 { var sl *Sublist @@ -2425,6 +2436,8 @@ func (g *srvGateway) shouldMapReplyForGatewaySend(c *client, acc *Account, reply acc.mu.Lock() c = acc.internalClient() acc.mu.Unlock() + } else if c.kind == SYSTEM { + c = g.sysCli } // If for this client there is a recent matching subscription interest // then we will map.