mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Update based on comments
Do the swapping to outbound connection only on send. It means that those subs are stored in the inbound connection and those are the only type of subs stored there. So on connection close it is easy to clean them up. Also instead of having processMsgResults have to return this sub, simply check the size of r.psubs and if 1, the type of client associated with it. If gateway, we know we have to do the direct send. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -2118,8 +2118,6 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
var qa [16][]byte
|
||||
queues := qa[:0]
|
||||
|
||||
var replySub *subscription
|
||||
|
||||
// Check for no interest, short circuit if so.
|
||||
// This is the fanout scale.
|
||||
if len(r.psubs)+len(r.qsubs) > 0 {
|
||||
@@ -2132,17 +2130,18 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
|
||||
qnames = &queues
|
||||
}
|
||||
replySub = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames)
|
||||
c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames)
|
||||
}
|
||||
|
||||
// Now deal with gateways
|
||||
if c.srv.gateway.enabled {
|
||||
// TODO(ik): Need to revisit all that.
|
||||
// If replySub is not nil it means that this is
|
||||
// a reply sent on the _R_ subject and associated with
|
||||
// an outbound connection. Send direct here.
|
||||
if replySub != nil {
|
||||
c.sendReplyMsgDirectToGateway(c.acc, replySub, msg)
|
||||
// If we have a single match and the sub's client connection
|
||||
// is a gateway, we know it is a message sent to a "_R_." subject
|
||||
// (since this is the only time we add a sub with a GW client
|
||||
// into an account sublist). In that case, do a direct send.
|
||||
if len(r.psubs) == 1 && r.psubs[0].client.gw != nil {
|
||||
c.sendReplyMsgDirectToGateway(c.acc, r.psubs[0], msg)
|
||||
} else {
|
||||
c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, queues)
|
||||
}
|
||||
@@ -2218,10 +2217,7 @@ func (c *client) addSubToRouteTargets(sub *subscription) {
|
||||
}
|
||||
|
||||
// This processes the sublist results for a given message.
|
||||
// If the incoming message was for a service reply (subject starts with `_R_.`)
|
||||
// and the sub's bound client is a gateway (will only be 1), then return
|
||||
// this subscription so that it can be sent direct to GW.
|
||||
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, queues *[][]byte) (replySub *subscription) {
|
||||
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, queues *[][]byte) {
|
||||
// msg header for clients.
|
||||
msgh := c.msgb[1:msgHeadProtoLen]
|
||||
msgh = append(msgh, subject...)
|
||||
@@ -2246,14 +2242,6 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
continue
|
||||
} else if sub.client.kind == GATEWAY {
|
||||
// Never send to gateway from here.
|
||||
if c.kind == GATEWAY {
|
||||
continue
|
||||
}
|
||||
// The only case we can be here is if this message
|
||||
// is a reply sent on the _R_.xxx subject. We don't
|
||||
// send here, return this sub so that processInboundClientMsg
|
||||
// can send direct to GW.
|
||||
replySub = sub
|
||||
continue
|
||||
}
|
||||
// Check for stream import mapped subs. These apply to local subs only.
|
||||
@@ -2407,7 +2395,6 @@ sendToRoutes:
|
||||
mh = append(mh, _CRLF_...)
|
||||
c.deliverMsg(rt.sub, mh, msg)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *client) pubPermissionViolation(subject []byte) {
|
||||
|
||||
@@ -1420,27 +1420,32 @@ func (s *Server) removeRemoteGatewayConnection(c *client) {
|
||||
s.removeFromTempClients(cid)
|
||||
|
||||
if isOutbound {
|
||||
var subsa [1024]*subscription
|
||||
var subs = subsa[:0]
|
||||
|
||||
// Update number of totalQSubs for this gateway
|
||||
qSubsRemoved := int64(0)
|
||||
c.mu.Lock()
|
||||
for _, sub := range c.subs {
|
||||
if sub.queue != nil {
|
||||
qSubsRemoved++
|
||||
} else if sub.qw > 0 {
|
||||
// Hack to track _R_ reply subs that need
|
||||
// removal.
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
// Update total count of qsubs in remote gateways.
|
||||
atomic.AddInt64(&c.srv.gateway.totalQSubs, -qSubsRemoved)
|
||||
|
||||
} else {
|
||||
var subsa [1024]*subscription
|
||||
var subs = subsa[:0]
|
||||
|
||||
// For inbound GW connection, if we have subs, those are
|
||||
// local subs on "_R_." subjects.
|
||||
c.mu.Lock()
|
||||
for _, sub := range c.subs {
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
for _, sub := range subs {
|
||||
c.removeReplySub(sub)
|
||||
}
|
||||
// Update total count of qsubs in remote gateways.
|
||||
atomic.AddInt64(&c.srv.gateway.totalQSubs, qSubsRemoved*-1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1909,8 +1914,14 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
|
||||
}
|
||||
|
||||
// Invoked by a PUB's connection to send a reply message on _R_ directly
|
||||
// to the outbound gateway connection (that is referenced in sub.client)
|
||||
// to the gateway connection.
|
||||
func (c *client) sendReplyMsgDirectToGateway(acc *Account, sub *subscription, msg []byte) {
|
||||
// The sub.client references the inbound connection, so we need to
|
||||
// swap to the outbound connection here.
|
||||
inbound := sub.client
|
||||
outbound := c.srv.getOutboundGatewayConnection(inbound.gw.name)
|
||||
sub.client = outbound
|
||||
|
||||
mh := c.msgb[:msgHeadProtoLen]
|
||||
mh = append(mh, acc.Name...)
|
||||
mh = append(mh, ' ')
|
||||
@@ -1919,9 +1930,9 @@ func (c *client) sendReplyMsgDirectToGateway(acc *Account, sub *subscription, ms
|
||||
mh = append(mh, c.pa.szb...)
|
||||
mh = append(mh, CR_LF...)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
// cleanup, use sub.client here, not `c` which is not the
|
||||
// gateway connection but instead a PUB's connection.
|
||||
sub.client.removeReplySub(sub)
|
||||
// Cleanup. Since the sub is stored in the inbound, use that to
|
||||
// call this function.
|
||||
inbound.removeReplySub(sub)
|
||||
}
|
||||
|
||||
// May send a message to all outbound gateways. It is possible
|
||||
@@ -2118,30 +2129,15 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
|
||||
// Copy off the reply since otherwise we are referencing a buffer that will be reused.
|
||||
reply := make([]byte, len(c.pa.reply))
|
||||
copy(reply, c.pa.reply)
|
||||
// If the connection is a GW connection, it is necessarily an
|
||||
// inbound connection. We will switch the the outbound GW connectiom
|
||||
// instead.
|
||||
conn := c
|
||||
sub := &subscription{client: conn, subject: reply, sid: sid, max: 1}
|
||||
if c.srv.gateway.enabled && c.gw != nil {
|
||||
conn = c.srv.getOutboundGatewayConnection(c.gw.name)
|
||||
if conn == nil {
|
||||
c.Errorf("Did not find outbound connection for %q", c.gw.name)
|
||||
} else {
|
||||
sub.client = conn
|
||||
// TODO(ik): Biggest hack ever, since this is not a queue
|
||||
// use `qw` to mark this sub for cleanup on connection close.
|
||||
sub.qw = 1
|
||||
}
|
||||
}
|
||||
sub := &subscription{client: c, subject: reply, sid: sid, max: 1}
|
||||
if err := acc.sl.Insert(sub); err != nil {
|
||||
c.Errorf("Could not insert subscription: %v", err)
|
||||
} else {
|
||||
ttl := acc.AutoExpireTTL()
|
||||
conn.mu.Lock()
|
||||
conn.subs[string(sid)] = sub
|
||||
conn.addReplySubTimeout(acc, sub, ttl)
|
||||
conn.mu.Unlock()
|
||||
c.mu.Lock()
|
||||
c.subs[string(sid)] = sub
|
||||
c.addReplySubTimeout(acc, sub, ttl)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user