mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #990 from nats-io/smap
Optimize updates for leaf node smaps.
This commit is contained in:
@@ -50,6 +50,7 @@ type Account struct {
|
||||
nrleafs int32
|
||||
clients map[*client]*client
|
||||
rm map[string]int32
|
||||
lleafs []*client
|
||||
imports importMap
|
||||
exports exportMap
|
||||
limits
|
||||
@@ -259,6 +260,7 @@ func (a *Account) addClient(c *client) int {
|
||||
a.sysclients++
|
||||
} else if c.kind == LEAF {
|
||||
a.nleafs++
|
||||
a.lleafs = append(a.lleafs, c)
|
||||
}
|
||||
}
|
||||
a.mu.Unlock()
|
||||
@@ -268,6 +270,25 @@ func (a *Account) addClient(c *client) int {
|
||||
return n
|
||||
}
|
||||
|
||||
// Helper function to remove leaf nodes. If number of leafnodes gets large
|
||||
// this may need to be optimized out of linear search but believe number
|
||||
// of active leafnodes per account scope to be small and therefore cache friendly.
|
||||
// Lock should be held on account.
|
||||
func (a *Account) removeLeafNode(c *client) {
|
||||
ll := len(a.lleafs)
|
||||
for i, l := range a.lleafs {
|
||||
if l == c {
|
||||
a.lleafs[i] = a.lleafs[ll-1]
|
||||
if ll == 1 {
|
||||
a.lleafs = nil
|
||||
} else {
|
||||
a.lleafs = a.lleafs[:ll-1]
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// removeClient keeps our accounting of local active clients updated.
|
||||
func (a *Account) removeClient(c *client) int {
|
||||
a.mu.Lock()
|
||||
@@ -279,6 +300,7 @@ func (a *Account) removeClient(c *client) int {
|
||||
a.sysclients--
|
||||
} else if c.kind == LEAF {
|
||||
a.nleafs--
|
||||
a.removeLeafNode(c)
|
||||
}
|
||||
}
|
||||
a.mu.Unlock()
|
||||
|
||||
@@ -862,13 +862,13 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
|
||||
return
|
||||
}
|
||||
|
||||
_l := [256]*client{}
|
||||
_l := [32]*client{}
|
||||
leafs := _l[:0]
|
||||
|
||||
// Grab all leaf nodes. Ignore leafnode if sub's client is a leafnode and matches.
|
||||
acc.mu.RLock()
|
||||
for _, ln := range acc.clients {
|
||||
if ln.kind == LEAF && ln != sub.client {
|
||||
for _, ln := range acc.lleafs {
|
||||
if ln != sub.client {
|
||||
leafs = append(leafs, ln)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1275,6 +1275,9 @@ func (s *Server) StartProfiler() {
|
||||
s.profilingServer = srv
|
||||
s.mu.Unlock()
|
||||
|
||||
// Enable blocking profile
|
||||
runtime.SetBlockProfileRate(1)
|
||||
|
||||
go func() {
|
||||
// if this errors out, it's probably because the server is being shutdown
|
||||
err := srv.Serve(l)
|
||||
@@ -1706,7 +1709,7 @@ func tlsCipher(cs uint16) string {
|
||||
|
||||
// Remove a client or route from our internal accounting.
|
||||
func (s *Server) removeClient(c *client) {
|
||||
// type is immutable, so can check without lock
|
||||
// kind is immutable, so can check without lock
|
||||
switch c.kind {
|
||||
case CLIENT:
|
||||
c.mu.Lock()
|
||||
@@ -1864,7 +1867,7 @@ func (s *Server) ClusterAddr() *net.TCPAddr {
|
||||
return s.routeListener.Addr().(*net.TCPAddr)
|
||||
}
|
||||
|
||||
// ProfilerAddr returns the net.Addr object for the route listener.
|
||||
// ProfilerAddr returns the net.Addr object for the profiler listener.
|
||||
func (s *Server) ProfilerAddr() *net.TCPAddr {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user