mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Do not inline inbound system messages to avoid blocking routes, gateways etc.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -83,6 +83,17 @@ const (
|
||||
// FIXME(dlc) - make configurable.
|
||||
var eventsHBInterval = 30 * time.Second
|
||||
|
||||
// Used if we have to queue things internally to avoid the route/gw path.
|
||||
type inSysMsg struct {
|
||||
sub *subscription
|
||||
c *client
|
||||
acc *Account
|
||||
subj string
|
||||
rply string
|
||||
msg []byte
|
||||
cb msgHandler
|
||||
}
|
||||
|
||||
// Used to send and receive messages from inside the server.
|
||||
type internal struct {
|
||||
account *Account
|
||||
@@ -94,6 +105,7 @@ type internal struct {
|
||||
stmr *time.Timer
|
||||
replies map[string]msgHandler
|
||||
sendq *ipQueue[*pubMsg]
|
||||
recvq *ipQueue[*inSysMsg]
|
||||
resetCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
sq *sendq
|
||||
@@ -300,6 +312,33 @@ type TypedEvent struct {
|
||||
Time time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// internalReceiveLoop will be responsible for dispatching all messages that
|
||||
// a server receives and needs to internally process, e.g. internal subs.
|
||||
func (s *Server) internalReceiveLoop() {
|
||||
s.mu.RLock()
|
||||
if s.sys == nil || s.sys.recvq == nil {
|
||||
s.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
recvq := s.sys.recvq
|
||||
s.mu.RUnlock()
|
||||
|
||||
for s.eventsRunning() {
|
||||
select {
|
||||
case <-recvq.ch:
|
||||
msgs := recvq.pop()
|
||||
for _, m := range msgs {
|
||||
if m.cb != nil {
|
||||
m.cb(m.sub, m.c, m.acc, m.subj, m.rply, m.msg)
|
||||
}
|
||||
}
|
||||
recvq.recycle(&msgs)
|
||||
case <-s.quitCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// internalSendLoop will be responsible for serializing all messages that
|
||||
// a server wants to send.
|
||||
func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
|
||||
@@ -841,26 +880,26 @@ func (s *Server) initEventTracking() {
|
||||
s.sys.inboxPre = subject
|
||||
// This is for remote updates for connection accounting.
|
||||
subject = fmt.Sprintf(accConnsEventSubjOld, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil {
|
||||
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking for %s: %v", subject, err)
|
||||
}
|
||||
// This will be for responses for account info that we send out.
|
||||
subject = fmt.Sprintf(connsRespSubj, s.info.ID)
|
||||
if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil {
|
||||
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// Listen for broad requests to respond with number of subscriptions for a given subject.
|
||||
if _, err := s.sysSubscribe(accNumSubsReqSubj, s.nsubsRequest); err != nil {
|
||||
if _, err := s.sysSubscribe(accNumSubsReqSubj, s.noInlineCallback(s.nsubsRequest)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// Listen for statsz from others.
|
||||
subject = fmt.Sprintf(serverStatsSubj, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.remoteServerUpdate); err != nil {
|
||||
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// Listen for all server shutdowns.
|
||||
subject = fmt.Sprintf(shutdownEventSubj, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.remoteServerShutdown); err != nil {
|
||||
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// Listen for account claims updates.
|
||||
@@ -870,14 +909,14 @@ func (s *Server) initEventTracking() {
|
||||
}
|
||||
if subscribeToUpdate {
|
||||
for _, sub := range []string{accUpdateEventSubjOld, accUpdateEventSubjNew} {
|
||||
if _, err := s.sysSubscribe(fmt.Sprintf(sub, "*"), s.accountClaimUpdate); err != nil {
|
||||
if _, err := s.sysSubscribe(fmt.Sprintf(sub, "*"), s.noInlineCallback(s.accountClaimUpdate)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Listen for ping messages that will be sent to all servers for statsz.
|
||||
// This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below
|
||||
if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.statszReq); err != nil {
|
||||
if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
monSrvc := map[string]msgHandler{
|
||||
@@ -921,11 +960,11 @@ func (s *Server) initEventTracking() {
|
||||
}
|
||||
for name, req := range monSrvc {
|
||||
subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name)
|
||||
if _, err := s.sysSubscribe(subject, req); err != nil {
|
||||
if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
subject = fmt.Sprintf(serverPingReqSubj, name)
|
||||
if _, err := s.sysSubscribe(subject, req); err != nil {
|
||||
if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -1017,14 +1056,14 @@ func (s *Server) initEventTracking() {
|
||||
"CONNS": s.connsRequest,
|
||||
}
|
||||
for name, req := range monAccSrvc {
|
||||
if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), req); err != nil {
|
||||
if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), s.noInlineCallback(req)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// For now only the STATZ subject has an account specific ping equivalent.
|
||||
if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"),
|
||||
func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
|
||||
s.noInlineCallback(func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
|
||||
optz := &AccountStatzEventOptions{}
|
||||
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
|
||||
if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil {
|
||||
@@ -1035,23 +1074,23 @@ func (s *Server) initEventTracking() {
|
||||
return stz, nil
|
||||
}
|
||||
})
|
||||
}); err != nil {
|
||||
})); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
|
||||
// Listen for updates when leaf nodes connect for a given account. This will
|
||||
// force any gateway connections to move to `modeInterestOnly`
|
||||
subject = fmt.Sprintf(leafNodeConnectEventSubj, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.leafNodeConnected); err != nil {
|
||||
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.leafNodeConnected)); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// For tracking remote latency measurements.
|
||||
subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash)
|
||||
if _, err := s.sysSubscribe(subject, s.remoteLatencyUpdate); err != nil {
|
||||
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteLatencyUpdate)); err != nil {
|
||||
s.Errorf("Error setting up internal latency tracking: %v", err)
|
||||
}
|
||||
// This is for simple debugging of number of subscribers that exist in the system.
|
||||
if _, err := s.sysSubscribeInternal(accSubsSubj, s.debugSubscribers); err != nil {
|
||||
if _, err := s.sysSubscribeInternal(accSubsSubj, s.noInlineCallback(s.debugSubscribers)); err != nil {
|
||||
s.Errorf("Error setting up internal debug service for subscribers: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -1988,6 +2027,24 @@ func (s *Server) sendAuthErrorEvent(c *client) {
|
||||
// rmsg contains header and the message. use client.msgParts(rmsg) to split them apart
|
||||
type msgHandler func(sub *subscription, client *client, acc *Account, subject, reply string, rmsg []byte)
|
||||
|
||||
// Create a wrapped callback handler for the subscription that will move it to an
|
||||
// internal recvQ for processing not inline with routes etc.
|
||||
func (s *Server) noInlineCallback(cb msgHandler) msgHandler {
|
||||
s.mu.RLock()
|
||||
if !s.eventsEnabled() {
|
||||
s.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
// Capture here for direct reference to avoid any unnecessary blocking inline with routes, gateways etc.
|
||||
recvq := s.sys.recvq
|
||||
s.mu.RUnlock()
|
||||
|
||||
return func(sub *subscription, c *client, acc *Account, subj, rply string, rmsg []byte) {
|
||||
// Need to copy.
|
||||
recvq.push(&inSysMsg{sub, c, acc, subj, rply, copyBytes(rmsg), cb})
|
||||
}
|
||||
}
|
||||
|
||||
// Create an internal subscription. sysSubscribeQ for queue groups.
|
||||
func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) {
|
||||
return s.systemSubscribe(subject, _EMPTY_, false, nil, cb)
|
||||
@@ -2027,9 +2084,10 @@ func (s *Server) systemSubscribe(subject, queue string, internalOnly bool, c *cl
|
||||
}
|
||||
|
||||
var q []byte
|
||||
if queue != "" {
|
||||
if queue != _EMPTY_ {
|
||||
q = []byte(queue)
|
||||
}
|
||||
|
||||
// Now create the subscription
|
||||
return c.processSub([]byte(subject), q, []byte(sid), cb, internalOnly)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user