mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -208,14 +208,19 @@ const (
|
||||
|
||||
// Used in readloop to cache hot subject lookups and group statistics.
|
||||
type readCache struct {
|
||||
// These are for clients who are bound to a single account.
|
||||
genid uint64
|
||||
results map[string]*SublistResult
|
||||
prand *rand.Rand
|
||||
msgs int
|
||||
bytes int
|
||||
subs int
|
||||
rsz int // Read buffer size
|
||||
srs int // Short reads, used for dynamic buffer resizing.
|
||||
|
||||
// This is for routes to have their own L1 as well that is account aware.
|
||||
rcache map[string]*routeCache
|
||||
|
||||
prand *rand.Rand
|
||||
msgs int
|
||||
bytes int
|
||||
subs int
|
||||
rsz int // Read buffer size
|
||||
srs int // Short reads, used for dynamic buffer resizing.
|
||||
}
|
||||
|
||||
func (c *client) String() (id string) {
|
||||
@@ -322,6 +327,7 @@ func (c *client) initClient() {
|
||||
c.ncs = fmt.Sprintf("%s - cid:%d", conn, c.cid)
|
||||
case ROUTER:
|
||||
c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid)
|
||||
c.in.rcache = make(map[string]*routeCache, 32)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1712,7 +1718,7 @@ func (c *client) processInboundMsg(msg []byte) {
|
||||
if c.typ == CLIENT {
|
||||
c.processInboundClientMsg(msg)
|
||||
} else {
|
||||
c.processInboundRouteMsg(msg)
|
||||
c.processInboundRoutedMsg(msg)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,11 +19,12 @@ import (
|
||||
|
||||
type pubArg struct {
|
||||
arg []byte
|
||||
rcache []byte
|
||||
account []byte
|
||||
queues [][]byte
|
||||
subject []byte
|
||||
reply []byte
|
||||
szb []byte
|
||||
queues [][]byte
|
||||
size int
|
||||
}
|
||||
|
||||
@@ -255,6 +256,9 @@ func (c *client) parse(buf []byte) error {
|
||||
c.processInboundMsg(c.msgBuf)
|
||||
c.argBuf, c.msgBuf = nil, nil
|
||||
c.drop, c.as, c.state = 0, i+1, OP_START
|
||||
// Drop all pub args
|
||||
c.pa.arg, c.pa.rcache, c.pa.account, c.pa.subject = nil, nil, nil, nil
|
||||
c.pa.reply, c.pa.szb, c.pa.queues = nil, nil, nil
|
||||
default:
|
||||
if c.msgBuf != nil {
|
||||
c.msgBuf = append(c.msgBuf, b)
|
||||
|
||||
@@ -214,14 +214,38 @@ func (c *client) processRoutedMsgArgs(arg []byte) error {
|
||||
// Common ones processed after check for arg length
|
||||
c.pa.account = args[0]
|
||||
c.pa.subject = args[1]
|
||||
c.pa.rcache = arg[:len(args[0])+len(args[1])+1]
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
maxRouteCacheSize = 32768
|
||||
pruneRouteCacheSize = 512
|
||||
)
|
||||
|
||||
// routeCache is for L1 semantics for inbound messages from a route to mimic the performance of clients.
|
||||
type routeCache struct {
|
||||
acc *Account
|
||||
results *SublistResult
|
||||
genid uint64
|
||||
}
|
||||
|
||||
// pruneRouteCache will prune off a random number of cache entries.
|
||||
func (c *client) pruneRouteCache() {
|
||||
n := 0
|
||||
for cacheKey := range c.in.rcache {
|
||||
delete(c.in.rcache, cacheKey)
|
||||
if n++; n > pruneRouteCacheSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processInboundRouteMsg is called to process an inbound msg from a route.
|
||||
func (c *client) processInboundRouteMsg(msg []byte) {
|
||||
func (c *client) processInboundRoutedMsg(msg []byte) {
|
||||
// Update statistics
|
||||
// The msg includes the CR_LF, so pull back out for accounting.
|
||||
c.in.msgs++
|
||||
// The msg includes the CR_LF, so pull back out for accounting.
|
||||
c.in.bytes += len(msg) - LEN_CR_LF
|
||||
|
||||
if c.trace {
|
||||
@@ -237,12 +261,43 @@ func (c *client) processInboundRouteMsg(msg []byte) {
|
||||
return
|
||||
}
|
||||
|
||||
// Match correct account and sublist.
|
||||
// We might want to make a local version to avoid any contention.
|
||||
acc := c.srv.LookupAccount(string(c.pa.account))
|
||||
if acc == nil {
|
||||
c.Debugf("Unknown account %q for routed message on subject: %q", c.pa.account, c.pa.subject)
|
||||
return
|
||||
var (
|
||||
acc *Account
|
||||
rc *routeCache
|
||||
r *SublistResult
|
||||
ok bool
|
||||
)
|
||||
|
||||
// Check our cache first.
|
||||
if rc, ok = c.in.rcache[string(c.pa.rcache)]; ok {
|
||||
// Check the genid to see if it's still valid.
|
||||
if genid := atomic.LoadUint64(&rc.acc.sl.genid); genid != rc.genid {
|
||||
ok = false
|
||||
delete(c.in.rcache, string(c.pa.rcache))
|
||||
} else {
|
||||
acc = rc.acc
|
||||
r = rc.results
|
||||
}
|
||||
}
|
||||
|
||||
if !ok {
|
||||
// Match correct account and sublist.
|
||||
acc = c.srv.LookupAccount(string(c.pa.account))
|
||||
if acc == nil {
|
||||
c.Debugf("Unknown account %q for routed message on subject: %q", c.pa.account, c.pa.subject)
|
||||
return
|
||||
}
|
||||
|
||||
// Match against the account sublist.
|
||||
r = acc.sl.Match(string(c.pa.subject))
|
||||
|
||||
// Store in our cache
|
||||
c.in.rcache[string(c.pa.rcache)] = &routeCache{acc, r, atomic.LoadUint64(&acc.sl.genid)}
|
||||
|
||||
// Check if we need to prune.
|
||||
if len(c.in.rcache) > maxRouteCacheSize {
|
||||
c.pruneRouteCache()
|
||||
}
|
||||
}
|
||||
|
||||
// Check to see if we need to map/route to another account.
|
||||
@@ -250,9 +305,6 @@ func (c *client) processInboundRouteMsg(msg []byte) {
|
||||
c.checkForImportServices(acc, msg)
|
||||
}
|
||||
|
||||
// No L1 right now for routes since they multiplex over multiple accounts.
|
||||
r := acc.sl.Match(string(c.pa.subject))
|
||||
|
||||
// Check for no interest, short circuit if so.
|
||||
// This is the fanout scale.
|
||||
if len(r.psubs)+len(r.qsubs) == 0 {
|
||||
@@ -982,6 +1034,9 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
// Initialize
|
||||
c.initClient()
|
||||
|
||||
// Initialize the route cache.
|
||||
c.in.rcache = make(map[string]*routeCache, maxRouteCacheSize)
|
||||
|
||||
if didSolicit {
|
||||
// Do this before the TLS code, otherwise, in case of failure
|
||||
// and if route is explicit, it would try to reconnect to 'nil'...
|
||||
|
||||
Reference in New Issue
Block a user