mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix performance degradation introduced by GW code
This impacted even non gateway traffic Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -2133,22 +2133,21 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
c.checkForImportServices(c.acc, msg)
|
||||
}
|
||||
|
||||
var qa [16][]byte
|
||||
queues := qa[:0]
|
||||
var qnames [][]byte
|
||||
|
||||
// Check for no interest, short circuit if so.
|
||||
// This is the fanout scale.
|
||||
if len(r.psubs)+len(r.qsubs) > 0 {
|
||||
var qnames *[][]byte
|
||||
var collect bool
|
||||
// If we have queue subs in this cluster, then if we run in gateway
|
||||
// mode and the remote gateways have queue subs, then we need to
|
||||
// collect the queue groups this message was sent to so that we
|
||||
// exclude them when sending to gateways.
|
||||
if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
|
||||
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
|
||||
qnames = &queues
|
||||
collect = true
|
||||
}
|
||||
c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames)
|
||||
qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, collect)
|
||||
}
|
||||
|
||||
// Now deal with gateways
|
||||
@@ -2161,7 +2160,7 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
if len(r.psubs) == 1 && r.psubs[0].client.gw != nil {
|
||||
c.sendReplyMsgDirectToGateway(c.acc, r.psubs[0], msg)
|
||||
} else {
|
||||
c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, queues)
|
||||
c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, qnames)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2195,7 +2194,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
|
||||
if c.kind == ROUTER || c.kind == GATEWAY && c.pa.queues == nil && len(rr.qsubs) > 0 {
|
||||
c.makeQFilter(rr.qsubs)
|
||||
}
|
||||
c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, nil)
|
||||
c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, false)
|
||||
// If this is not a gateway connection but gateway is enabled,
|
||||
// try to send this converted message to all gateways.
|
||||
if c.kind != GATEWAY && c.srv.gateway.enabled {
|
||||
@@ -2240,7 +2239,8 @@ func (c *client) addSubToRouteTargets(sub *subscription) {
|
||||
}
|
||||
|
||||
// This processes the sublist results for a given message.
|
||||
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, queues *[][]byte) {
|
||||
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, collect bool) [][]byte {
|
||||
var queues [][]byte
|
||||
// msg header for clients.
|
||||
msgh := c.msgb[1:msgHeadProtoLen]
|
||||
msgh = append(msgh, subject...)
|
||||
@@ -2283,7 +2283,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
|
||||
// If we are sourced from a route we need to have direct filtered queues.
|
||||
if c.kind == ROUTER && c.pa.queues == nil {
|
||||
return
|
||||
return queues
|
||||
}
|
||||
|
||||
// Set these up to optionally filter based on the queue lists.
|
||||
@@ -2346,8 +2346,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
continue
|
||||
} else {
|
||||
c.addSubToRouteTargets(sub)
|
||||
if queues != nil {
|
||||
*queues = append(*queues, sub.queue)
|
||||
if collect {
|
||||
queues = append(queues, sub.queue)
|
||||
}
|
||||
}
|
||||
break
|
||||
@@ -2366,8 +2366,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
if c.deliverMsg(sub, mh, msg) {
|
||||
// Clear rsub
|
||||
rsub = nil
|
||||
if queues != nil {
|
||||
*queues = append(*queues, sub.queue)
|
||||
if collect {
|
||||
queues = append(queues, sub.queue)
|
||||
}
|
||||
break
|
||||
}
|
||||
@@ -2377,8 +2377,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
// If we are here we tried to deliver to a local qsub
|
||||
// but failed. So we will send it to a remote.
|
||||
c.addSubToRouteTargets(rsub)
|
||||
if queues != nil {
|
||||
*queues = append(*queues, rsub.queue)
|
||||
if collect {
|
||||
queues = append(queues, rsub.queue)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2387,7 +2387,7 @@ sendToRoutes:
|
||||
|
||||
// If no messages for routes return here.
|
||||
if len(c.in.rts) == 0 {
|
||||
return
|
||||
return queues
|
||||
}
|
||||
|
||||
// We address by index to avoid struct copy.
|
||||
@@ -2418,6 +2418,7 @@ sendToRoutes:
|
||||
mh = append(mh, _CRLF_...)
|
||||
c.deliverMsg(rt.sub, mh, msg)
|
||||
}
|
||||
return queues
|
||||
}
|
||||
|
||||
func (c *client) pubPermissionViolation(subject []byte) {
|
||||
|
||||
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.0.0-beta.6"
|
||||
VERSION = "2.0.0-beta.7"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -2174,7 +2174,7 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, nil)
|
||||
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false)
|
||||
}
|
||||
|
||||
// Indicates that the remote which we are sending messages to
|
||||
|
||||
@@ -308,7 +308,7 @@ func (c *client) processInboundRoutedMsg(msg []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, nil)
|
||||
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false)
|
||||
}
|
||||
|
||||
// Helper function for routes and gateways to create qfilters need for
|
||||
|
||||
Reference in New Issue
Block a user