diff --git a/server/client.go b/server/client.go index fd371d1d..5cd0ae68 100644 --- a/server/client.go +++ b/server/client.go @@ -2133,22 +2133,21 @@ func (c *client) processInboundClientMsg(msg []byte) { c.checkForImportServices(c.acc, msg) } - var qa [16][]byte - queues := qa[:0] + var qnames [][]byte // Check for no interest, short circuit if so. // This is the fanout scale. if len(r.psubs)+len(r.qsubs) > 0 { - var qnames *[][]byte + var collect bool // If we have queue subs in this cluster, then if we run in gateway // mode and the remote gateways have queue subs, then we need to // collect the queue groups this message was sent to so that we // exclude them when sending to gateways. if len(r.qsubs) > 0 && c.srv.gateway.enabled && atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 { - qnames = &queues + collect = true } - c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames) + qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, collect) } // Now deal with gateways @@ -2161,7 +2160,7 @@ func (c *client) processInboundClientMsg(msg []byte) { if len(r.psubs) == 1 && r.psubs[0].client.gw != nil { c.sendReplyMsgDirectToGateway(c.acc, r.psubs[0], msg) } else { - c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, queues) + c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, qnames) } } } @@ -2195,7 +2194,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { if c.kind == ROUTER || c.kind == GATEWAY && c.pa.queues == nil && len(rr.qsubs) > 0 { c.makeQFilter(rr.qsubs) } - c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, nil) + c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, false) // If this is not a gateway connection but gateway is enabled, // try to send this converted message to all gateways. if c.kind != GATEWAY && c.srv.gateway.enabled { @@ -2240,7 +2239,8 @@ func (c *client) addSubToRouteTargets(sub *subscription) { } // This processes the sublist results for a given message. -func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, queues *[][]byte) { +func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, collect bool) [][]byte { + var queues [][]byte // msg header for clients. msgh := c.msgb[1:msgHeadProtoLen] msgh = append(msgh, subject...) @@ -2283,7 +2283,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // If we are sourced from a route we need to have direct filtered queues. if c.kind == ROUTER && c.pa.queues == nil { - return + return queues } // Set these up to optionally filter based on the queue lists. @@ -2346,8 +2346,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, continue } else { c.addSubToRouteTargets(sub) - if queues != nil { - *queues = append(*queues, sub.queue) + if collect { + queues = append(queues, sub.queue) } } break @@ -2366,8 +2366,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, if c.deliverMsg(sub, mh, msg) { // Clear rsub rsub = nil - if queues != nil { - *queues = append(*queues, sub.queue) + if collect { + queues = append(queues, sub.queue) } break } @@ -2377,8 +2377,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // If we are here we tried to deliver to a local qsub // but failed. So we will send it to a remote. c.addSubToRouteTargets(rsub) - if queues != nil { - *queues = append(*queues, rsub.queue) + if collect { + queues = append(queues, rsub.queue) } } } @@ -2387,7 +2387,7 @@ sendToRoutes: // If no messages for routes return here. if len(c.in.rts) == 0 { - return + return queues } // We address by index to avoid struct copy. @@ -2418,6 +2418,7 @@ sendToRoutes: mh = append(mh, _CRLF_...) c.deliverMsg(rt.sub, mh, msg) } + return queues } func (c *client) pubPermissionViolation(subject []byte) { diff --git a/server/const.go b/server/const.go index fb99d81d..8ee1ae91 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.0.0-beta.6" + VERSION = "2.0.0-beta.7" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/gateway.go b/server/gateway.go index de398a69..efec1c15 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2174,7 +2174,7 @@ func (c *client) processInboundGatewayMsg(msg []byte) { } } - c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, nil) + c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false) } // Indicates that the remote which we are sending messages to diff --git a/server/route.go b/server/route.go index 2f422e36..ec18e674 100644 --- a/server/route.go +++ b/server/route.go @@ -308,7 +308,7 @@ func (c *client) processInboundRoutedMsg(msg []byte) { } } - c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, nil) + c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false) } // Helper function for routes and gateways to create qfilters need for