mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
L1 client cache for sublist and accounting optimizations
This commit is contained in:
@@ -46,6 +46,7 @@ type client struct {
|
||||
bw *bufio.Writer
|
||||
srv *Server
|
||||
subs map[string]*subscription
|
||||
cache readCache
|
||||
pcd map[*client]struct{}
|
||||
atmr *time.Timer
|
||||
ptmr *time.Timer
|
||||
@@ -59,6 +60,14 @@ type client struct {
|
||||
trace bool
|
||||
}
|
||||
|
||||
// Used in readloop to cache hot subject lookups and group statistics.
|
||||
type readCache struct {
|
||||
genid uint64
|
||||
inMsgs int64
|
||||
inBytes int64
|
||||
subs map[string][]*subscription
|
||||
}
|
||||
|
||||
func (c *client) String() (id string) {
|
||||
return c.ncs
|
||||
}
|
||||
@@ -154,6 +163,8 @@ func (c *client) readLoop() {
|
||||
return
|
||||
}
|
||||
|
||||
// read buffer
|
||||
// FIXME(dlc) make dynamic
|
||||
b := make([]byte, s.opts.BufSize)
|
||||
|
||||
for {
|
||||
@@ -164,6 +175,11 @@ func (c *client) readLoop() {
|
||||
}
|
||||
// Grab for updates for last activity.
|
||||
last := time.Now()
|
||||
|
||||
// Clear inbound stats cache
|
||||
c.cache.inMsgs = 0
|
||||
c.cache.inBytes = 0
|
||||
|
||||
if err := c.parse(b[:n]); err != nil {
|
||||
// handled inline
|
||||
if err != ErrMaxPayload && err != ErrAuthorization {
|
||||
@@ -173,6 +189,13 @@ func (c *client) readLoop() {
|
||||
}
|
||||
return
|
||||
}
|
||||
// Updates stats for client and server that were collected
|
||||
// from parsing through the buffer.
|
||||
atomic.AddInt64(&c.inMsgs, c.cache.inMsgs)
|
||||
atomic.AddInt64(&c.inBytes, c.cache.inBytes)
|
||||
atomic.AddInt64(&s.inMsgs, c.cache.inMsgs)
|
||||
atomic.AddInt64(&s.inBytes, c.cache.inBytes)
|
||||
|
||||
// Check pending clients for flush.
|
||||
for cp := range c.pcd {
|
||||
// Flush those in the set
|
||||
@@ -717,20 +740,20 @@ writeErr:
|
||||
|
||||
// processMsg is called to process an inbound msg from a client.
|
||||
func (c *client) processMsg(msg []byte) {
|
||||
|
||||
// Update statistics
|
||||
|
||||
// The msg includes the CR_LF, so pull back out for accounting.
|
||||
msgSize := int64(len(msg) - LEN_CR_LF)
|
||||
|
||||
// Since we don't grab the client's lock, use atomic here.
|
||||
// Monitor will also use atomic to read those.
|
||||
atomic.AddInt64(&c.inMsgs, 1)
|
||||
atomic.AddInt64(&c.inBytes, msgSize)
|
||||
|
||||
// Snapshot server.
|
||||
srv := c.srv
|
||||
|
||||
// Create cache subs map if needed.
|
||||
if c.cache.subs == nil && srv != nil {
|
||||
c.cache.subs = make(map[string][]*subscription)
|
||||
c.cache.genid = atomic.LoadUint64(&srv.sl.genid)
|
||||
}
|
||||
|
||||
// Update statistics
|
||||
// The msg includes the CR_LF, so pull back out for accounting.
|
||||
c.cache.inMsgs += 1
|
||||
c.cache.inBytes += int64(len(msg) - LEN_CR_LF)
|
||||
|
||||
if c.trace {
|
||||
c.traceMsg(msg)
|
||||
}
|
||||
@@ -741,11 +764,28 @@ func (c *client) processMsg(msg []byte) {
|
||||
return
|
||||
}
|
||||
|
||||
// Accounting
|
||||
atomic.AddInt64(&srv.inMsgs, 1)
|
||||
atomic.AddInt64(&srv.inBytes, msgSize)
|
||||
var genid uint64
|
||||
var r []*subscription
|
||||
var ok bool
|
||||
|
||||
subject := string(c.pa.subject)
|
||||
|
||||
if srv != nil {
|
||||
genid = atomic.LoadUint64(&srv.sl.genid)
|
||||
}
|
||||
|
||||
if genid == c.cache.genid && c.cache.subs != nil {
|
||||
r, ok = c.cache.subs[subject]
|
||||
} else {
|
||||
// reset
|
||||
c.cache.subs = make(map[string][]*subscription)
|
||||
c.cache.genid = genid
|
||||
}
|
||||
if !ok {
|
||||
r = srv.sl.Match(subject)
|
||||
c.cache.subs[subject] = r
|
||||
}
|
||||
|
||||
r := srv.sl.Match(string(c.pa.subject))
|
||||
if len(r) <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ const slCacheMax = 1024
|
||||
// A Sublist stores and efficiently retrieves subscriptions.
|
||||
type Sublist struct {
|
||||
sync.RWMutex
|
||||
genid uint64
|
||||
matches uint64
|
||||
cacheHits uint64
|
||||
inserts uint64
|
||||
@@ -128,9 +129,9 @@ func (s *Sublist) Insert(sub *subscription) error {
|
||||
s.inserts++
|
||||
|
||||
s.addToCache(subject, sub)
|
||||
atomic.AddUint64(&s.genid, 1)
|
||||
|
||||
s.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -305,6 +306,7 @@ func (s *Sublist) Remove(sub *subscription) error {
|
||||
}
|
||||
}
|
||||
s.removeFromCache(subject, sub)
|
||||
atomic.AddUint64(&s.genid, 1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user