From 042e5a539a11e1b5f33e47dbc952444536896b60 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 9 May 2019 17:25:17 -0700 Subject: [PATCH] Optimize updates for leaf node smaps. Previously we would walk all clients bound to an account to collect the leaf nodes for updating of the subscription maps. Signed-off-by: Derek Collison --- server/accounts.go | 22 ++++++++++++++++++++++ server/leafnode.go | 6 +++--- server/server.go | 7 +++++-- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 5ab9ba44..82aff2be 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -50,6 +50,7 @@ type Account struct { nrleafs int32 clients map[*client]*client rm map[string]int32 + lleafs []*client imports importMap exports exportMap limits @@ -259,6 +260,7 @@ func (a *Account) addClient(c *client) int { a.sysclients++ } else if c.kind == LEAF { a.nleafs++ + a.lleafs = append(a.lleafs, c) } } a.mu.Unlock() @@ -268,6 +270,25 @@ func (a *Account) addClient(c *client) int { return n } +// Helper function to remove leaf nodes. If number of leafnodes gets large +// this may need to be optimized out of linear search but believe number +// of active leafnodes per account scope to be small and therefore cache friendly. +// Lock should be held on account. +func (a *Account) removeLeafNode(c *client) { + ll := len(a.lleafs) + for i, l := range a.lleafs { + if l == c { + a.lleafs[i] = a.lleafs[ll-1] + if ll == 1 { + a.lleafs = nil + } else { + a.lleafs = a.lleafs[:ll-1] + } + return + } + } +} + // removeClient keeps our accounting of local active clients updated. func (a *Account) removeClient(c *client) int { a.mu.Lock() @@ -279,6 +300,7 @@ func (a *Account) removeClient(c *client) int { a.sysclients-- } else if c.kind == LEAF { a.nleafs-- + a.removeLeafNode(c) } } a.mu.Unlock() diff --git a/server/leafnode.go b/server/leafnode.go index 0e133b69..4819fdb5 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -862,13 +862,13 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) { return } - _l := [256]*client{} + _l := [32]*client{} leafs := _l[:0] // Grab all leaf nodes. Ignore leafnode if sub's client is a leafnode and matches. acc.mu.RLock() - for _, ln := range acc.clients { - if ln.kind == LEAF && ln != sub.client { + for _, ln := range acc.lleafs { + if ln != sub.client { leafs = append(leafs, ln) } } diff --git a/server/server.go b/server/server.go index ad1b7069..c6f4a787 100644 --- a/server/server.go +++ b/server/server.go @@ -1270,6 +1270,9 @@ func (s *Server) StartProfiler() { s.profilingServer = srv s.mu.Unlock() + // Enable blocking profile + runtime.SetBlockProfileRate(1) + go func() { // if this errors out, it's probably because the server is being shutdown err := srv.Serve(l) @@ -1701,7 +1704,7 @@ func tlsCipher(cs uint16) string { // Remove a client or route from our internal accounting. func (s *Server) removeClient(c *client) { - // type is immutable, so can check without lock + // kind is immutable, so can check without lock switch c.kind { case CLIENT: c.mu.Lock() @@ -1859,7 +1862,7 @@ func (s *Server) ClusterAddr() *net.TCPAddr { return s.routeListener.Addr().(*net.TCPAddr) } -// ProfilerAddr returns the net.Addr object for the route listener. +// ProfilerAddr returns the net.Addr object for the profiler listener. func (s *Server) ProfilerAddr() *net.TCPAddr { s.mu.Lock() defer s.mu.Unlock()