mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
No need to hold server write lock since sendq has its own.
I noticed some contention when I was investigating a catchup bug on the server write lock. Medium term we could have a separate lock, longer term formal client support in the server will alleviate. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
committed by
Ivan Kozlovic
parent
a5119008a5
commit
d54899de0a
@@ -481,9 +481,9 @@ func (s *Server) sendInternalAccountMsg(a *Account, subject string, msg interfac
|
||||
|
||||
// Used to send an internal message with an optional reply to an arbitrary account.
|
||||
func (s *Server) sendInternalAccountMsgWithReply(a *Account, subject, reply string, hdr map[string]string, msg interface{}, echo bool) error {
|
||||
s.mu.Lock()
|
||||
s.mu.RLock()
|
||||
if s.sys == nil || s.sys.sendq == nil {
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
return ErrNoSysAccount
|
||||
}
|
||||
c := s.sys.client
|
||||
@@ -494,16 +494,16 @@ func (s *Server) sendInternalAccountMsgWithReply(a *Account, subject, reply stri
|
||||
a.mu.Unlock()
|
||||
}
|
||||
s.sys.sendq.push(newPubMsg(c, subject, reply, nil, hdr, msg, noCompression, echo, false))
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// This will queue up a message to be sent.
|
||||
// Lock should not be held.
|
||||
func (s *Server) sendInternalMsgLocked(subj, rply string, si *ServerInfo, msg interface{}) {
|
||||
s.mu.Lock()
|
||||
s.mu.RLock()
|
||||
s.sendInternalMsg(subj, rply, si, msg)
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
}
|
||||
|
||||
// This will queue up a message to be sent.
|
||||
@@ -517,13 +517,13 @@ func (s *Server) sendInternalMsg(subj, rply string, si *ServerInfo, msg interfac
|
||||
|
||||
// Will send an api response.
|
||||
func (s *Server) sendInternalResponse(subj string, response *ServerAPIResponse) {
|
||||
s.mu.Lock()
|
||||
s.mu.RLock()
|
||||
if s.sys == nil || s.sys.sendq == nil {
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
s.sys.sendq.push(newPubMsg(nil, subj, _EMPTY_, response.Server, nil, response, response.compress, false, false))
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
}
|
||||
|
||||
// Used to send internal messages from other system clients to avoid no echo issues.
|
||||
@@ -535,13 +535,13 @@ func (c *client) sendInternalMsg(subj, rply string, si *ServerInfo, msg interfac
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.mu.RLock()
|
||||
if s.sys == nil || s.sys.sendq == nil {
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
s.sys.sendq.push(newPubMsg(c, subj, rply, si, nil, msg, noCompression, false, false))
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
}
|
||||
|
||||
// Locked version of checking if events system running. Also checks server.
|
||||
|
||||
Reference in New Issue
Block a user