From bfe83aff811d47c54c76e15dc7390f6a6e0071ac Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 23 Apr 2019 17:07:12 -0700 Subject: [PATCH] Make account lookup faster with sync.Map Signed-off-by: Derek Collison --- server/accounts.go | 41 ++++++++----- server/accounts_test.go | 6 +- server/auth.go | 8 ++- server/client.go | 4 +- server/events.go | 54 +++++++++-------- server/gateway_test.go | 5 +- server/reload.go | 25 ++++++-- server/route.go | 8 ++- server/server.go | 127 ++++++++++++++++++++++++--------------- server/split_test.go | 4 +- test/bench_test.go | 129 ++++++++++++++++++++++++++++++++++++++++ 11 files changed, 303 insertions(+), 108 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index e81eb158..9229a3b0 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -130,16 +130,18 @@ func (a *Account) shallowCopy() *Account { // all known servers. func (a *Account) NumConnections() int { a.mu.RLock() - defer a.mu.RUnlock() - return len(a.clients) + int(a.nrclients) + nc := len(a.clients) + int(a.nrclients) + a.mu.RUnlock() + return nc } // NumLocalClients returns active number of clients for this account // on this server. func (a *Account) NumLocalConnections() int { a.mu.RLock() - defer a.mu.RUnlock() - return a.numLocalConnections() + nlc := a.numLocalConnections() + a.mu.RUnlock() + return nlc } // Do not account for the system accounts. @@ -150,8 +152,9 @@ func (a *Account) numLocalConnections() int { // MaxClientsReached returns if we have reached our limit for number of connections. func (a *Account) MaxTotalConnectionsReached() bool { a.mu.RLock() - defer a.mu.RUnlock() - return a.maxTotalConnectionsReached() + mtc := a.maxTotalConnectionsReached() + a.mu.RUnlock() + return mtc } func (a *Account) maxTotalConnectionsReached() bool { @@ -759,8 +762,9 @@ func (a *Account) checkServiceImportAuthorizedNoLock(account *Account, subject s // IsExpired returns expiration status. func (a *Account) IsExpired() bool { a.mu.RLock() - defer a.mu.RUnlock() - return a.expired + exp := a.expired + a.mu.RUnlock() + return exp } // Called when an account has expired. @@ -863,7 +867,7 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) { s.updateAccountClaims(a, ac) } -// updateAccountClaims will update and existing account with new claims. +// updateAccountClaims will update an existing account with new claims. // This will replace any exports or imports previously defined. func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { if a == nil { @@ -925,7 +929,10 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { } } for _, i := range ac.Imports { - acc := s.accounts[i.Account] + var acc *Account + if v, ok := s.accounts.Load(i.Account); ok { + acc = v.(*Account) + } if acc == nil { if acc, _ = s.fetchAccount(i.Account); acc == nil { s.Debugf("Can't locate account [%s] for import of [%v] %s", i.Account, i.Subject, i.Type) @@ -957,7 +964,8 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { clients := make([]*client, 0, 16) // We need to check all accounts that have an import claim from this account. awcsti := map[string]struct{}{} - for _, acc := range s.accounts { + s.accounts.Range(func(k, v interface{}) bool { + acc := v.(*Account) acc.mu.Lock() for _, im := range acc.imports.streams { if im != nil && im.acc.Name == a.Name { @@ -970,7 +978,8 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { } } acc.mu.Unlock() - } + return true + }) // Now walk clients. for _, c := range clients { c.processSubsOnConfigReload(awcsti) @@ -978,7 +987,8 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { } // Now check if service exports have changed. if !a.checkServiceExportsEqual(old) || signersChanged { - for _, acc := range s.accounts { + s.accounts.Range(func(k, v interface{}) bool { + acc := v.(*Account) acc.mu.Lock() for _, im := range acc.imports.services { if im != nil && im.acc.Name == a.Name { @@ -987,13 +997,16 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { } } acc.mu.Unlock() - } + return true + }) } // Now do limits if they are present. + a.mu.Lock() a.msubs = int32(ac.Limits.Subs) a.mpay = int32(ac.Limits.Payload) a.mconns = int32(ac.Limits.Conn) + a.mu.Unlock() clients := gatherClients() // Sort if we are over the limit. diff --git a/server/accounts_test.go b/server/accounts_test.go index 81a756ec..27ba0988 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -106,8 +106,8 @@ func TestAccountFromOptions(t *testing.T) { s := New(&opts) ta := s.numReservedAccounts() + 2 - if la := len(s.accounts); la != ta { - t.Fatalf("Expected to have a server with %d total accounts, got %v", ta, la) + if la := s.numAccounts(); la != ta { + t.Fatalf("Expected to have a server with %d active accounts, got %v", ta, la) } // Check that sl is filled in. fooAcc, _ := s.LookupAccount("foo") @@ -201,7 +201,7 @@ func TestActiveAccounts(t *testing.T) { t.Fatalf("Expected account bar to have 1 client, got %d", nc) } - waitTilActiveCount := func(n int) { + waitTilActiveCount := func(n int32) { t.Helper() checkFor(t, time.Second, 10*time.Millisecond, func() error { if active := s.NumActiveAccounts(); active != n { diff --git a/server/auth.go b/server/auth.go index dc321962..79bc58f8 100644 --- a/server/auth.go +++ b/server/auth.go @@ -201,7 +201,9 @@ func (s *Server) configureAuthorization() { for _, u := range opts.Nkeys { copy := u.clone() if u.Account != nil { - copy.Account = s.accounts[u.Account.Name] + if v, ok := s.accounts.Load(u.Account.Name); ok { + copy.Account = v.(*Account) + } } s.nkeys[u.Nkey] = copy } @@ -211,7 +213,9 @@ func (s *Server) configureAuthorization() { for _, u := range opts.Users { copy := u.clone() if u.Account != nil { - copy.Account = s.accounts[u.Account.Name] + if v, ok := s.accounts.Load(u.Account.Name); ok { + copy.Account = v.(*Account) + } } s.users[u.Username] = copy } diff --git a/server/client.go b/server/client.go index e2ef7030..c8692b92 100644 --- a/server/client.go +++ b/server/client.go @@ -2895,9 +2895,7 @@ func (c *client) closeConnection(reason ClosedState) { srv.updateLeafNodes(acc, esub.sub, -(esub.n)) } if prev := acc.removeClient(c); prev == 1 && srv != nil { - srv.mu.Lock() - srv.activeAccounts-- - srv.mu.Unlock() + srv.decActiveAccounts() } } } diff --git a/server/events.go b/server/events.go index 0f6c55eb..5423b801 100644 --- a/server/events.go +++ b/server/events.go @@ -268,8 +268,9 @@ func (s *Server) sendInternalMsg(sub, rply string, si *ServerInfo, msg interface // Locked version of checking if events system running. Also checks server. func (s *Server) eventsRunning() bool { s.mu.Lock() - defer s.mu.Unlock() - return s.running && s.eventsEnabled() + er := s.running && s.eventsEnabled() + s.mu.Unlock() + return er } // EventsEnabled will report if the server has internal events enabled via @@ -343,7 +344,7 @@ func (s *Server) sendStatsz(subj string) { m.Stats.Start = s.start m.Stats.Connections = len(s.clients) m.Stats.TotalConnections = s.totalClients - m.Stats.ActiveAccounts = s.activeAccounts + m.Stats.ActiveAccounts = int(atomic.LoadInt32(&s.activeAccounts)) m.Stats.Received.Msgs = atomic.LoadInt64(&s.inMsgs) m.Stats.Received.Bytes = atomic.LoadInt64(&s.inBytes) m.Stats.Sent.Msgs = atomic.LoadInt64(&s.outMsgs) @@ -467,21 +468,24 @@ func (s *Server) accountClaimUpdate(sub *subscription, subject, reply string, ms s.Debugf("Received account claims update on bad subject %q", subject) return } - accName := toks[accUpdateAccIndex] - s.updateAccountWithClaimJWT(s.accounts[accName], string(msg)) + if v, ok := s.accounts.Load(toks[accUpdateAccIndex]); ok { + s.updateAccountWithClaimJWT(v.(*Account), string(msg)) + } } // processRemoteServerShutdown will update any affected accounts. // Will update the remote count for clients. // Lock assume held. func (s *Server) processRemoteServerShutdown(sid string) { - for _, a := range s.accounts { + s.accounts.Range(func(k, v interface{}) bool { + a := v.(*Account) a.mu.Lock() prev := a.strack[sid] delete(a.strack, sid) a.nrclients -= prev a.mu.Unlock() - } + return true + }) } // remoteServerShutdownEvent is called when we get an event from another server shutting down. @@ -542,7 +546,8 @@ func (s *Server) shutdownEventing() { defer s.mu.Unlock() // Whip through all accounts. - for _, a := range s.accounts { + s.accounts.Range(func(k, v interface{}) bool { + a := v.(*Account) a.mu.Lock() a.nrclients = 0 // Now clear state @@ -551,16 +556,15 @@ func (s *Server) shutdownEventing() { a.clients = nil a.strack = nil a.mu.Unlock() - } + return true + }) // Turn everything off here. s.sys = nil } // Request for our local connection count. func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []byte) { - s.mu.Lock() - defer s.mu.Unlock() - if !s.eventsEnabled() { + if !s.eventsRunning() { return } m := accNumConnsReq{} @@ -573,7 +577,9 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by return } if nlc := acc.NumLocalConnections(); nlc > 0 { + s.mu.Lock() s.sendAccConnsUpdate(acc, reply) + s.mu.Unlock() } } @@ -587,15 +593,14 @@ func (s *Server) leafNodeConnected(sub *subscription, subject, reply string, msg } s.mu.Lock() - gateway := s.gateway - if m.Account == "" || !s.eventsEnabled() || !gateway.enabled { - s.mu.Unlock() - return - } - acc, _ := s.lookupAccount(m.Account) + na := m.Account == "" || !s.eventsEnabled() || !s.gateway.enabled s.mu.Unlock() - if acc != nil { + if na { + return + } + + if acc, _ := s.lookupAccount(m.Account); acc != nil { s.switchAccountToInterestMode(acc.Name) } } @@ -612,9 +617,7 @@ func (s *Server) statszReq(sub *subscription, subject, reply string, msg []byte) // remoteConnsUpdate gets called when we receive a remote update from another server. func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg []byte) { - s.mu.Lock() - defer s.mu.Unlock() - if !s.eventsEnabled() { + if !s.eventsRunning() { return } m := AccountNumConns{} @@ -622,13 +625,16 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg s.sys.client.Errorf("Error unmarshalling account connection event message: %v", err) return } + // See if we have the account registered, if not drop it. + acc, _ := s.lookupAccount(m.Account) + s.mu.Lock() + defer s.mu.Unlock() + // Double check that this is not us, should never happen, so error if it does. if m.Server.ID == s.info.ID { s.sys.client.Errorf("Processing our own account connection event message: ignored") return } - // See if we have the account registered, if not drop it. - acc, _ := s.lookupAccount(m.Account) if acc == nil { s.sys.client.Debugf("Received account connection event for unknown account: %s", m.Account) return diff --git a/server/gateway_test.go b/server/gateway_test.go index b665d6e4..b865043b 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -2987,9 +2987,7 @@ func TestGatewaySendAllSubs(t *testing.T) { if scount != consCount { return fmt.Errorf("Expected %v consumers for global account, got %v", consCount, scount) } - sc.mu.Lock() - acount := len(sc.accounts) - sc.mu.Unlock() + acount := sc.numAccounts() if acount != accsCount+1 { return fmt.Errorf("Expected %v accounts, got %v", accsCount+1, acount) } @@ -4434,6 +4432,7 @@ func TestGatewayServiceExportWithWildcards(t *testing.T) { } } +// NOTE: if this fails for you and says only has <10 outbound, make sure ulimit for open files > 256. func TestGatewayMemUsage(t *testing.T) { // Try to clean up. runtime.GC() diff --git a/server/reload.go b/server/reload.go index 333daec5..2b9dcd24 100644 --- a/server/reload.go +++ b/server/reload.go @@ -787,17 +787,30 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { func (s *Server) reloadAuthorization() { s.mu.Lock() - oldAccounts := s.accounts - s.accounts, s.gacc = nil, nil + // We need to drain the old accounts here since we have something + // new configured. We do not want s.accounts to change since that would + // mean adding a lock to lookupAccount which is what we are tryin to optimize + // with the change from a map to a sync.Map. + oldAccounts := make(map[string]*Account) + s.accounts.Range(func(k, v interface{}) bool { + acc := v.(*Account) + acc.mu.RLock() + oldAccounts[acc.Name] = acc + acc.mu.RUnlock() + s.accounts.Delete(k) + return true + }) + s.gacc = nil s.configureAccounts() s.configureAuthorization() // This map will contain the names of accounts that have their streams // import configuration changed. - awcsti := make(map[string]struct{}, len(s.accounts)) + awcsti := make(map[string]struct{}) - for _, newAcc := range s.accounts { + s.accounts.Range(func(k, v interface{}) bool { + newAcc := v.(*Account) if acc, ok := oldAccounts[newAcc.Name]; ok { // If account exist in latest config, "transfer" the account's // sublist and client map to the new account. @@ -817,7 +830,9 @@ func (s *Server) reloadAuthorization() { awcsti[newAcc.Name] = struct{}{} } } - } + return true + }) + // Gather clients that changed accounts. We will close them and they // will reconnect, doing the right thing. var ( diff --git a/server/route.go b/server/route.go index 408f984c..37c32d4a 100644 --- a/server/route.go +++ b/server/route.go @@ -890,8 +890,9 @@ func (s *Server) sendSubsToRoute(route *client) { eSize := 0 // Send over our account subscriptions. // copy accounts into array first - accs := make([]*Account, 0, len(s.accounts)) - for _, a := range s.accounts { + accs := make([]*Account, 0, 32) + s.accounts.Range(func(k, v interface{}) bool { + a := v.(*Account) accs = append(accs, a) a.mu.RLock() // Proto looks like: "RS+ [ ]\r\n" @@ -900,7 +901,8 @@ func (s *Server) sendSubsToRoute(route *client) { // later going over each account. eSize += len(a.rm) * (4 + len(a.Name) + 256) a.mu.RUnlock() - } + return true + }) s.mu.Unlock() sendSubs := func(accs []*Account) { diff --git a/server/server.go b/server/server.go index b510f261..80b9541b 100644 --- a/server/server.go +++ b/server/server.go @@ -98,8 +98,8 @@ type Server struct { listener net.Listener gacc *Account sys *internal - accounts map[string]*Account - activeAccounts int + accounts sync.Map + activeAccounts int32 accResolver AccountResolver clients map[uint64]*client routes map[uint64]*client @@ -339,11 +339,8 @@ func (s *Server) globalAccount() *Account { return gacc } +// Used to setup Accounts. func (s *Server) configureAccounts() error { - // Used to setup Accounts. - if s.accounts == nil { - s.accounts = make(map[string]*Account) - } // Create global account. if s.gacc == nil { s.gacc = NewAccount(globalAccountName) @@ -368,10 +365,15 @@ func (s *Server) configureAccounts() error { return } for sub, a := range ea.approved { - ea.approved[sub] = s.accounts[a.Name] + var acc *Account + if v, ok := s.accounts.Load(a.Name); ok { + acc = v.(*Account) + } + ea.approved[sub] = acc } } - for _, acc := range s.accounts { + s.accounts.Range(func(k, v interface{}) bool { + acc := v.(*Account) // Exports for _, ea := range acc.exports.streams { swapApproved(ea) @@ -381,12 +383,17 @@ func (s *Server) configureAccounts() error { } // Imports for _, si := range acc.imports.streams { - si.acc = s.accounts[si.acc.Name] + if v, ok := s.accounts.Load(si.acc.Name); ok { + si.acc = v.(*Account) + } } for _, si := range acc.imports.services { - si.acc = s.accounts[si.acc.Name] + if v, ok := s.accounts.Load(si.acc.Name); ok { + si.acc = v.(*Account) + } } - } + return true + }) // Check for configured account resolvers. if opts.AccountResolver != nil { @@ -405,10 +412,15 @@ func (s *Server) configureAccounts() error { } // Set the system account if it was configured. if opts.SystemAccount != _EMPTY_ { - if _, err := s.lookupAccount(opts.SystemAccount); err != nil { + // Lock is held entering this function, so release to call lookupAccount. + s.mu.Unlock() + _, err := s.lookupAccount(opts.SystemAccount) + s.mu.Lock() + if err != nil { return fmt.Errorf("error resolving system account: %v", err) } } + return nil } @@ -545,60 +557,66 @@ func (s *Server) numReservedAccounts() int { } // NumActiveAccounts reports number of active accounts on this server. -func (s *Server) NumActiveAccounts() int { - s.mu.Lock() - defer s.mu.Unlock() - return s.activeAccounts +func (s *Server) NumActiveAccounts() int32 { + return atomic.LoadInt32(&s.activeAccounts) } // incActiveAccounts() just adds one under lock. func (s *Server) incActiveAccounts() { - s.mu.Lock() - s.activeAccounts++ - s.mu.Unlock() + atomic.AddInt32(&s.activeAccounts, 1) } -// dev=cActiveAccounts() just subtracts one under lock. +// decActiveAccounts() just subtracts one under lock. func (s *Server) decActiveAccounts() { + atomic.AddInt32(&s.activeAccounts, -1) +} + +// This should be used for testing only. Will be slow since we have to +// range over all accounts in the sync.Map to count. +func (s *Server) numAccounts() int { + count := 0 s.mu.Lock() - s.activeAccounts-- + s.accounts.Range(func(k, v interface{}) bool { + count++ + return true + }) s.mu.Unlock() + return count } // LookupOrRegisterAccount will return the given account if known or create a new entry. func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) { - s.mu.Lock() - defer s.mu.Unlock() - if acc, ok := s.accounts[name]; ok { - return acc, false + if v, ok := s.accounts.Load(name); ok { + return v.(*Account), false } + s.mu.Lock() acc := NewAccount(name) s.registerAccount(acc) + s.mu.Unlock() return acc, true } // RegisterAccount will register an account. The account must be new // or this call will fail. func (s *Server) RegisterAccount(name string) (*Account, error) { - s.mu.Lock() - defer s.mu.Unlock() - - if _, ok := s.accounts[name]; ok { + if _, ok := s.accounts.Load(name); ok { return nil, ErrAccountExists } + s.mu.Lock() acc := NewAccount(name) s.registerAccount(acc) + s.mu.Unlock() return acc, nil } // SetSystemAccount will set the internal system account. // If root operators are present it will also check validity. func (s *Server) SetSystemAccount(accName string) error { - s.mu.Lock() - if acc := s.accounts[accName]; acc != nil { - s.mu.Unlock() - return s.setSystemAccount(acc) + if v, ok := s.accounts.Load(accName); ok { + return s.setSystemAccount(v.(*Account)) } + + s.mu.Lock() // If we are here we do not have local knowledge of this account. // Do this one by hand to return more useful error. ac, jwt, err := s.fetchAccountClaims(accName) @@ -610,6 +628,7 @@ func (s *Server) SetSystemAccount(accName string) error { acc.claimJWT = jwt s.registerAccount(acc) s.mu.Unlock() + return s.setSystemAccount(acc) } @@ -684,12 +703,13 @@ func (s *Server) setSystemAccount(acc *Account) error { } func (s *Server) systemAccount() *Account { + var sacc *Account s.mu.Lock() - defer s.mu.Unlock() - if s.sys == nil { - return nil + if s.sys != nil { + sacc = s.sys.account } - return s.sys.account + s.mu.Unlock() + return sacc } // Determine if accounts should track subscriptions for @@ -726,20 +746,25 @@ func (s *Server) registerAccount(acc *Account) { } acc.srv = s acc.mu.Unlock() - s.accounts[acc.Name] = acc + s.accounts.Store(acc.Name, acc) s.enableAccountTracking(acc) } // lookupAccount is a function to return the account structure // associated with an account name. -// Lock should be held on entry. func (s *Server) lookupAccount(name string) (*Account, error) { - acc := s.accounts[name] - if acc != nil { + if v, ok := s.accounts.Load(name); ok { + acc := v.(*Account) // If we are expired and we have a resolver, then // return the latest information from the resolver. - if s.accResolver != nil && acc.IsExpired() { - if err := s.updateAccount(acc); err != nil { + if acc.IsExpired() { + var err error + s.mu.Lock() + if s.accResolver != nil { + err = s.updateAccount(acc) + } + s.mu.Unlock() + if err != nil { return nil, err } } @@ -749,14 +774,15 @@ func (s *Server) lookupAccount(name string) (*Account, error) { if s.accResolver == nil { return nil, ErrMissingAccount } - return s.fetchAccount(name) + s.mu.Lock() + acc, err := s.fetchAccount(name) + s.mu.Unlock() + return acc, err } // LookupAccount is a public function to return the account structure // associated with name. func (s *Server) LookupAccount(name string) (*Account, error) { - s.mu.Lock() - defer s.mu.Unlock() return s.lookupAccount(name) } @@ -850,7 +876,8 @@ func (s *Server) fetchAccount(name string) (*Account, error) { // We have released the lock during the low level fetch. // Now that we are back under lock, check again if account // is in the map or not. If it is, simply return it. - if acc := s.accounts[name]; acc != nil { + if v, ok := s.accounts.Load(name); ok { + acc := v.(*Account) // Update with the new claims in case they are new. // Following call will return ErrAccountResolverSameClaims // if claims are the same. @@ -1778,11 +1805,13 @@ func (s *Server) NumSubscriptions() uint32 { // Lock should be held. func (s *Server) numSubscriptions() uint32 { var subs int - for _, acc := range s.accounts { + s.accounts.Range(func(k, v interface{}) bool { + acc := v.(*Account) if acc.sl != nil { subs += acc.TotalSubs() } - } + return true + }) return uint32(subs) } diff --git a/server/split_test.go b/server/split_test.go index 7c901646..b0029380 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -28,7 +28,7 @@ func TestSplitBufferSubOp(t *testing.T) { if err != nil { t.Fatalf("Error creating gateways: %v", err) } - s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: gws} + s := &Server{gacc: NewAccount(globalAccountName), gateway: gws} s.registerAccount(s.gacc) c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription), nc: cli} @@ -65,7 +65,7 @@ func TestSplitBufferSubOp(t *testing.T) { } func TestSplitBufferUnsubOp(t *testing.T) { - s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: &srvGateway{}} + s := &Server{gacc: NewAccount(globalAccountName), gateway: &srvGateway{}} s.registerAccount(s.gacc) c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} diff --git a/test/bench_test.go b/test/bench_test.go index cc6c80cf..4dd19834 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -1126,3 +1126,132 @@ func Benchmark_Gateways_InterestOnly_2kx10x1(b *testing.B) { func Benchmark_Gateways_InterestOnly_4kx10x1(b *testing.B) { gatewaysBench(b, false, sizedString(4096), 10, true) } + +// Run some benchmarks against interest churn across routes. +// Watching for contention retrieving accounts, etc. +func Benchmark_________________LookupAccount(b *testing.B) { + s := runBenchServer() + defer s.Shutdown() + + const acc = "$foo.bar" + + if _, err := s.RegisterAccount(acc); err != nil { + b.Fatalf("Error registering '%s' - %v", acc, err) + } + // Now create Go routines to cycle through Lookups. + numRoutines := 100 + loop := b.N / numRoutines + + startCh := make(chan bool) + + lookupLoop := func(ready, done chan bool) { + // Signal we are ready + close(ready) + // Wait to start up actual unsubs. + <-startCh + + for i := 0; i < loop; i++ { + if _, err := s.LookupAccount(acc); err != nil { + b.Errorf("Error looking up account: %v", err) + } + } + close(done) + } + + da := make([]chan bool, 0, numRoutines) + for i := 0; i < numRoutines; i++ { + ready := make(chan bool) + done := make(chan bool) + go lookupLoop(ready, done) + da = append(da, done) + <-ready + } + + b.ResetTimer() + close(startCh) + for _, ch := range da { + <-ch + } + b.StopTimer() +} + +func Benchmark___________RoutedInterestGraph(b *testing.B) { + s, o := RunServerWithConfig("./configs/srv_a.conf") + o.AllowNewAccounts = true + defer s.Shutdown() + + numRoutes := 100 + loop := b.N / numRoutes + + type rh struct { + r net.Conn + send sendFun + expect expectFun + done chan bool + } + + routes := make([]*rh, 0, numRoutes) + for i := 0; i < numRoutes; i++ { + r := createRouteConn(b, o.Cluster.Host, o.Cluster.Port) + defer r.Close() + + checkInfoMsg(b, r) + send, expect := setupRoute(b, r, o) + send("PING\r\n") + expect(pongRe) + + bw := bufio.NewWriterSize(r, defaultSendBufSize) + + account := fmt.Sprintf("$foo.account.%d", i) + for s := 0; s < loop; s++ { + bw.Write([]byte(fmt.Sprintf("RS+ %s foo.bar.%d\r\n", account, s))) + } + bw.Flush() + send("PING\r\n") + expect(pongRe) + routes = append(routes, &rh{r, send, expect, make(chan bool)}) + } + + startCh := make(chan bool) + + unsubLoop := func(route *rh, ch chan bool, index int) { + bw := bufio.NewWriterSize(route.r, defaultSendBufSize) + account := fmt.Sprintf("$foo.account.%d", index) + + // Signal we are ready + close(ch) + + // Wait to start up actual unsubs. + <-startCh + + for i := 0; i < loop; i++ { + _, err := bw.Write([]byte(fmt.Sprintf("RS- %s foo.bar.%d\r\n", account, i))) + if err != nil { + b.Errorf("Received error on RS- write: %v\n", err) + return + } + } + err := bw.Flush() + if err != nil { + b.Errorf("Received error on FLUSH write: %v\n", err) + return + } + route.send("PING\r\n") + route.expect(pongRe) + close(route.done) + } + + for i, route := range routes { + ch := make(chan bool) + go unsubLoop(route, ch, i) + <-ch + } + + // Actual unsub test here. + b.ResetTimer() + close(startCh) + for _, route := range routes { + <-route.done + } + b.StopTimer() +}