From d0a9a47ea35e5b37455b2a10eb075a78fbb04508 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 3 Apr 2016 05:41:45 -0700 Subject: [PATCH] L1 client cache for sublist and accounting optimizations --- server/client.go | 70 +++++++++++++++++++++++++++++++++++++---------- server/sublist.go | 4 ++- 2 files changed, 58 insertions(+), 16 deletions(-) diff --git a/server/client.go b/server/client.go index 4f5a61e9..84528d23 100644 --- a/server/client.go +++ b/server/client.go @@ -46,6 +46,7 @@ type client struct { bw *bufio.Writer srv *Server subs map[string]*subscription + cache readCache pcd map[*client]struct{} atmr *time.Timer ptmr *time.Timer @@ -59,6 +60,14 @@ type client struct { trace bool } +// Used in readloop to cache hot subject lookups and group statistics. +type readCache struct { + genid uint64 + inMsgs int64 + inBytes int64 + subs map[string][]*subscription +} + func (c *client) String() (id string) { return c.ncs } @@ -154,6 +163,8 @@ func (c *client) readLoop() { return } + // read buffer + // FIXME(dlc) make dynamic b := make([]byte, s.opts.BufSize) for { @@ -164,6 +175,11 @@ func (c *client) readLoop() { } // Grab for updates for last activity. last := time.Now() + + // Clear inbound stats cache + c.cache.inMsgs = 0 + c.cache.inBytes = 0 + if err := c.parse(b[:n]); err != nil { // handled inline if err != ErrMaxPayload && err != ErrAuthorization { @@ -173,6 +189,13 @@ func (c *client) readLoop() { } return } + // Updates stats for client and server that were collected + // from parsing through the buffer. + atomic.AddInt64(&c.inMsgs, c.cache.inMsgs) + atomic.AddInt64(&c.inBytes, c.cache.inBytes) + atomic.AddInt64(&s.inMsgs, c.cache.inMsgs) + atomic.AddInt64(&s.inBytes, c.cache.inBytes) + // Check pending clients for flush. for cp := range c.pcd { // Flush those in the set @@ -717,20 +740,20 @@ writeErr: // processMsg is called to process an inbound msg from a client. func (c *client) processMsg(msg []byte) { - - // Update statistics - - // The msg includes the CR_LF, so pull back out for accounting. - msgSize := int64(len(msg) - LEN_CR_LF) - - // Since we don't grab the client's lock, use atomic here. - // Monitor will also use atomic to read those. - atomic.AddInt64(&c.inMsgs, 1) - atomic.AddInt64(&c.inBytes, msgSize) - // Snapshot server. srv := c.srv + // Create cache subs map if needed. + if c.cache.subs == nil && srv != nil { + c.cache.subs = make(map[string][]*subscription) + c.cache.genid = atomic.LoadUint64(&srv.sl.genid) + } + + // Update statistics + // The msg includes the CR_LF, so pull back out for accounting. + c.cache.inMsgs += 1 + c.cache.inBytes += int64(len(msg) - LEN_CR_LF) + if c.trace { c.traceMsg(msg) } @@ -741,11 +764,28 @@ func (c *client) processMsg(msg []byte) { return } - // Accounting - atomic.AddInt64(&srv.inMsgs, 1) - atomic.AddInt64(&srv.inBytes, msgSize) + var genid uint64 + var r []*subscription + var ok bool + + subject := string(c.pa.subject) + + if srv != nil { + genid = atomic.LoadUint64(&srv.sl.genid) + } + + if genid == c.cache.genid && c.cache.subs != nil { + r, ok = c.cache.subs[subject] + } else { + // reset + c.cache.subs = make(map[string][]*subscription) + c.cache.genid = genid + } + if !ok { + r = srv.sl.Match(subject) + c.cache.subs[subject] = r + } - r := srv.sl.Match(string(c.pa.subject)) if len(r) <= 0 { return } diff --git a/server/sublist.go b/server/sublist.go index 5be67ace..2569a2a9 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -33,6 +33,7 @@ const slCacheMax = 1024 // A Sublist stores and efficiently retrieves subscriptions. type Sublist struct { sync.RWMutex + genid uint64 matches uint64 cacheHits uint64 inserts uint64 @@ -128,9 +129,9 @@ func (s *Sublist) Insert(sub *subscription) error { s.inserts++ s.addToCache(subject, sub) + atomic.AddUint64(&s.genid, 1) s.Unlock() - return nil } @@ -305,6 +306,7 @@ func (s *Sublist) Remove(sub *subscription) error { } } s.removeFromCache(subject, sub) + atomic.AddUint64(&s.genid, 1) return nil }