diff --git a/server/gateway.go b/server/gateway.go index 2a7de885..4cab3303 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -164,6 +164,13 @@ type srvGateway struct { // This is to track recent subscriptions for a given connection rsubs sync.Map + // This client will be used for SYSTEM clients when storing/looking up + // recent subscriptions in rsubs. This is because some code may not + // use the same actual *client object for SYSTEM client. For instance + // a raft node creates an internal client, that would be used to store + // the subscription in rsubs, but the sending part that checks for rsubs + // is using an internal client created in sendq.go's internalLoop. + sysCli *client resolver netResolver // Used to resolve host name before calling net.Dial() sqbsz int // Max buffer size to send queue subs protocol. Used for testing. @@ -359,6 +366,7 @@ func (s *Server) newGateway(opts *Options) error { resolver: opts.Gateway.resolver, runknown: opts.Gateway.RejectUnknown, oldHash: getOldHash(opts.Gateway.Name), + sysCli: &client{}, } gateway.Lock() defer gateway.Unlock() @@ -2360,6 +2368,9 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha if sub.client != nil { rsubs := &s.gateway.rsubs c := sub.client + if c.kind == SYSTEM { + c = s.gateway.sysCli + } sli, _ := rsubs.Load(c) if change > 0 { var sl *Sublist @@ -2425,6 +2436,8 @@ func (g *srvGateway) shouldMapReplyForGatewaySend(c *client, acc *Account, reply acc.mu.Lock() c = acc.internalClient() acc.mu.Unlock() + } else if c.kind == SYSTEM { + c = g.sysCli } // If for this client there is a recent matching subscription interest // then we will map.