Merge pull request #3412 from nats-io/fix_sys_client_gw

[FIXED] Gateway: system request/replies may not work properly
This commit is contained in:
Ivan Kozlovic
2022-08-29 14:29:28 -06:00
committed by GitHub

View File

@@ -164,6 +164,13 @@ type srvGateway struct {
// This is to track recent subscriptions for a given connection
rsubs sync.Map
// This client will be used for SYSTEM clients when storing/looking up
// recent subscriptions in rsubs. This is because some code may not
// use the same actual *client object for SYSTEM client. For instance
// a raft node creates an internal client, that would be used to store
// the subscription in rsubs, but the sending part that checks for rsubs
// is using an internal client created in sendq.go's internalLoop.
sysCli *client
resolver netResolver // Used to resolve host name before calling net.Dial()
sqbsz int // Max buffer size to send queue subs protocol. Used for testing.
@@ -359,6 +366,7 @@ func (s *Server) newGateway(opts *Options) error {
resolver: opts.Gateway.resolver,
runknown: opts.Gateway.RejectUnknown,
oldHash: getOldHash(opts.Gateway.Name),
sysCli: &client{},
}
gateway.Lock()
defer gateway.Unlock()
@@ -2360,6 +2368,9 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
if sub.client != nil {
rsubs := &s.gateway.rsubs
c := sub.client
if c.kind == SYSTEM {
c = s.gateway.sysCli
}
sli, _ := rsubs.Load(c)
if change > 0 {
var sl *Sublist
@@ -2425,6 +2436,8 @@ func (g *srvGateway) shouldMapReplyForGatewaySend(c *client, acc *Account, reply
acc.mu.Lock()
c = acc.internalClient()
acc.mu.Unlock()
} else if c.kind == SYSTEM {
c = g.sysCli
}
// If for this client there is a recent matching subscription interest
// then we will map.