From 57904107b231e6ec8af28dc905f22cb382dffe70 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 1 Nov 2018 13:31:12 -0700 Subject: [PATCH] Added L1 cache to routes Signed-off-by: Derek Collison --- server/client.go | 20 ++++++++----- server/parser.go | 6 +++- server/route.go | 77 +++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 84 insertions(+), 19 deletions(-) diff --git a/server/client.go b/server/client.go index 2774d5b2..fcd12df6 100644 --- a/server/client.go +++ b/server/client.go @@ -208,14 +208,19 @@ const ( // Used in readloop to cache hot subject lookups and group statistics. type readCache struct { + // These are for clients who are bound to a single account. genid uint64 results map[string]*SublistResult - prand *rand.Rand - msgs int - bytes int - subs int - rsz int // Read buffer size - srs int // Short reads, used for dynamic buffer resizing. + + // This is for routes to have their own L1 as well that is account aware. + rcache map[string]*routeCache + + prand *rand.Rand + msgs int + bytes int + subs int + rsz int // Read buffer size + srs int // Short reads, used for dynamic buffer resizing. } func (c *client) String() (id string) { @@ -322,6 +327,7 @@ func (c *client) initClient() { c.ncs = fmt.Sprintf("%s - cid:%d", conn, c.cid) case ROUTER: c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid) + c.in.rcache = make(map[string]*routeCache, 32) } } @@ -1712,7 +1718,7 @@ func (c *client) processInboundMsg(msg []byte) { if c.typ == CLIENT { c.processInboundClientMsg(msg) } else { - c.processInboundRouteMsg(msg) + c.processInboundRoutedMsg(msg) } } diff --git a/server/parser.go b/server/parser.go index 79e2360c..9763e6f1 100644 --- a/server/parser.go +++ b/server/parser.go @@ -19,11 +19,12 @@ import ( type pubArg struct { arg []byte + rcache []byte account []byte - queues [][]byte subject []byte reply []byte szb []byte + queues [][]byte size int } @@ -255,6 +256,9 @@ func (c *client) parse(buf []byte) error { c.processInboundMsg(c.msgBuf) c.argBuf, c.msgBuf = nil, nil c.drop, c.as, c.state = 0, i+1, OP_START + // Drop all pub args + c.pa.arg, c.pa.rcache, c.pa.account, c.pa.subject = nil, nil, nil, nil + c.pa.reply, c.pa.szb, c.pa.queues = nil, nil, nil default: if c.msgBuf != nil { c.msgBuf = append(c.msgBuf, b) diff --git a/server/route.go b/server/route.go index 7a3c9d5c..c37a46e4 100644 --- a/server/route.go +++ b/server/route.go @@ -214,14 +214,38 @@ func (c *client) processRoutedMsgArgs(arg []byte) error { // Common ones processed after check for arg length c.pa.account = args[0] c.pa.subject = args[1] + c.pa.rcache = arg[:len(args[0])+len(args[1])+1] return nil } +const ( + maxRouteCacheSize = 32768 + pruneRouteCacheSize = 512 +) + +// routeCache is for L1 semantics for inbound messages from a route to mimic the performance of clients. +type routeCache struct { + acc *Account + results *SublistResult + genid uint64 +} + +// pruneRouteCache will prune off a random number of cache entries. +func (c *client) pruneRouteCache() { + n := 0 + for cacheKey := range c.in.rcache { + delete(c.in.rcache, cacheKey) + if n++; n > pruneRouteCacheSize { + break + } + } +} + // processInboundRouteMsg is called to process an inbound msg from a route. -func (c *client) processInboundRouteMsg(msg []byte) { +func (c *client) processInboundRoutedMsg(msg []byte) { // Update statistics - // The msg includes the CR_LF, so pull back out for accounting. c.in.msgs++ + // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += len(msg) - LEN_CR_LF if c.trace { @@ -237,12 +261,43 @@ func (c *client) processInboundRouteMsg(msg []byte) { return } - // Match correct account and sublist. - // We might want to make a local version to avoid any contention. - acc := c.srv.LookupAccount(string(c.pa.account)) - if acc == nil { - c.Debugf("Unknown account %q for routed message on subject: %q", c.pa.account, c.pa.subject) - return + var ( + acc *Account + rc *routeCache + r *SublistResult + ok bool + ) + + // Check our cache first. + if rc, ok = c.in.rcache[string(c.pa.rcache)]; ok { + // Check the genid to see if it's still valid. + if genid := atomic.LoadUint64(&rc.acc.sl.genid); genid != rc.genid { + ok = false + delete(c.in.rcache, string(c.pa.rcache)) + } else { + acc = rc.acc + r = rc.results + } + } + + if !ok { + // Match correct account and sublist. + acc = c.srv.LookupAccount(string(c.pa.account)) + if acc == nil { + c.Debugf("Unknown account %q for routed message on subject: %q", c.pa.account, c.pa.subject) + return + } + + // Match against the account sublist. + r = acc.sl.Match(string(c.pa.subject)) + + // Store in our cache + c.in.rcache[string(c.pa.rcache)] = &routeCache{acc, r, atomic.LoadUint64(&acc.sl.genid)} + + // Check if we need to prune. + if len(c.in.rcache) > maxRouteCacheSize { + c.pruneRouteCache() + } } // Check to see if we need to map/route to another account. @@ -250,9 +305,6 @@ func (c *client) processInboundRouteMsg(msg []byte) { c.checkForImportServices(acc, msg) } - // No L1 right now for routes since they multiplex over multiple accounts. - r := acc.sl.Match(string(c.pa.subject)) - // Check for no interest, short circuit if so. // This is the fanout scale. if len(r.psubs)+len(r.qsubs) == 0 { @@ -982,6 +1034,9 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { // Initialize c.initClient() + // Initialize the route cache. + c.in.rcache = make(map[string]*routeCache, maxRouteCacheSize) + if didSolicit { // Do this before the TLS code, otherwise, in case of failure // and if route is explicit, it would try to reconnect to 'nil'...