diff --git a/server/events.go b/server/events.go index 0ff6a257..826e76a4 100644 --- a/server/events.go +++ b/server/events.go @@ -83,6 +83,17 @@ const ( // FIXME(dlc) - make configurable. var eventsHBInterval = 30 * time.Second +// Used if we have to queue things internally to avoid the route/gw path. +type inSysMsg struct { + sub *subscription + c *client + acc *Account + subj string + rply string + msg []byte + cb msgHandler +} + // Used to send and receive messages from inside the server. type internal struct { account *Account @@ -94,6 +105,7 @@ type internal struct { stmr *time.Timer replies map[string]msgHandler sendq *ipQueue[*pubMsg] + recvq *ipQueue[*inSysMsg] resetCh chan struct{} wg sync.WaitGroup sq *sendq @@ -300,6 +312,33 @@ type TypedEvent struct { Time time.Time `json:"timestamp"` } +// internalReceiveLoop will be responsible for dispatching all messages that +// a server receives and needs to internally process, e.g. internal subs. +func (s *Server) internalReceiveLoop() { + s.mu.RLock() + if s.sys == nil || s.sys.recvq == nil { + s.mu.RUnlock() + return + } + recvq := s.sys.recvq + s.mu.RUnlock() + + for s.eventsRunning() { + select { + case <-recvq.ch: + msgs := recvq.pop() + for _, m := range msgs { + if m.cb != nil { + m.cb(m.sub, m.c, m.acc, m.subj, m.rply, m.msg) + } + } + recvq.recycle(&msgs) + case <-s.quitCh: + return + } + } +} + // internalSendLoop will be responsible for serializing all messages that // a server wants to send. func (s *Server) internalSendLoop(wg *sync.WaitGroup) { @@ -841,26 +880,26 @@ func (s *Server) initEventTracking() { s.sys.inboxPre = subject // This is for remote updates for connection accounting. subject = fmt.Sprintf(accConnsEventSubjOld, "*") - if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil { + if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil { s.Errorf("Error setting up internal tracking for %s: %v", subject, err) } // This will be for responses for account info that we send out. subject = fmt.Sprintf(connsRespSubj, s.info.ID) - if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil { + if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // Listen for broad requests to respond with number of subscriptions for a given subject. - if _, err := s.sysSubscribe(accNumSubsReqSubj, s.nsubsRequest); err != nil { + if _, err := s.sysSubscribe(accNumSubsReqSubj, s.noInlineCallback(s.nsubsRequest)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // Listen for statsz from others. subject = fmt.Sprintf(serverStatsSubj, "*") - if _, err := s.sysSubscribe(subject, s.remoteServerUpdate); err != nil { + if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // Listen for all server shutdowns. subject = fmt.Sprintf(shutdownEventSubj, "*") - if _, err := s.sysSubscribe(subject, s.remoteServerShutdown); err != nil { + if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // Listen for account claims updates. @@ -870,14 +909,14 @@ func (s *Server) initEventTracking() { } if subscribeToUpdate { for _, sub := range []string{accUpdateEventSubjOld, accUpdateEventSubjNew} { - if _, err := s.sysSubscribe(fmt.Sprintf(sub, "*"), s.accountClaimUpdate); err != nil { + if _, err := s.sysSubscribe(fmt.Sprintf(sub, "*"), s.noInlineCallback(s.accountClaimUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } } } // Listen for ping messages that will be sent to all servers for statsz. // This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below - if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.statszReq); err != nil { + if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } monSrvc := map[string]msgHandler{ @@ -921,11 +960,11 @@ func (s *Server) initEventTracking() { } for name, req := range monSrvc { subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name) - if _, err := s.sysSubscribe(subject, req); err != nil { + if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } subject = fmt.Sprintf(serverPingReqSubj, name) - if _, err := s.sysSubscribe(subject, req); err != nil { + if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } } @@ -1017,14 +1056,14 @@ func (s *Server) initEventTracking() { "CONNS": s.connsRequest, } for name, req := range monAccSrvc { - if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), req); err != nil { + if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } } // For now only the STATZ subject has an account specific ping equivalent. if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"), - func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + s.noInlineCallback(func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &AccountStatzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil { @@ -1035,23 +1074,23 @@ func (s *Server) initEventTracking() { return stz, nil } }) - }); err != nil { + })); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // Listen for updates when leaf nodes connect for a given account. This will // force any gateway connections to move to `modeInterestOnly` subject = fmt.Sprintf(leafNodeConnectEventSubj, "*") - if _, err := s.sysSubscribe(subject, s.leafNodeConnected); err != nil { + if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.leafNodeConnected)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } // For tracking remote latency measurements. subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash) - if _, err := s.sysSubscribe(subject, s.remoteLatencyUpdate); err != nil { + if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteLatencyUpdate)); err != nil { s.Errorf("Error setting up internal latency tracking: %v", err) } // This is for simple debugging of number of subscribers that exist in the system. - if _, err := s.sysSubscribeInternal(accSubsSubj, s.debugSubscribers); err != nil { + if _, err := s.sysSubscribeInternal(accSubsSubj, s.noInlineCallback(s.debugSubscribers)); err != nil { s.Errorf("Error setting up internal debug service for subscribers: %v", err) } } @@ -1988,6 +2027,24 @@ func (s *Server) sendAuthErrorEvent(c *client) { // rmsg contains header and the message. use client.msgParts(rmsg) to split them apart type msgHandler func(sub *subscription, client *client, acc *Account, subject, reply string, rmsg []byte) +// Create a wrapped callback handler for the subscription that will move it to an +// internal recvQ for processing not inline with routes etc. +func (s *Server) noInlineCallback(cb msgHandler) msgHandler { + s.mu.RLock() + if !s.eventsEnabled() { + s.mu.RUnlock() + return nil + } + // Capture here for direct reference to avoid any unnecessary blocking inline with routes, gateways etc. + recvq := s.sys.recvq + s.mu.RUnlock() + + return func(sub *subscription, c *client, acc *Account, subj, rply string, rmsg []byte) { + // Need to copy. + recvq.push(&inSysMsg{sub, c, acc, subj, rply, copyBytes(rmsg), cb}) + } +} + // Create an internal subscription. sysSubscribeQ for queue groups. func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) { return s.systemSubscribe(subject, _EMPTY_, false, nil, cb) @@ -2027,9 +2084,10 @@ func (s *Server) systemSubscribe(subject, queue string, internalOnly bool, c *cl } var q []byte - if queue != "" { + if queue != _EMPTY_ { q = []byte(queue) } + // Now create the subscription return c.processSub([]byte(subject), q, []byte(sid), cb, internalOnly) } diff --git a/server/server.go b/server/server.go index 71c1fbad..2e62a68e 100644 --- a/server/server.go +++ b/server/server.go @@ -1260,13 +1260,13 @@ func (s *Server) setSystemAccount(acc *Account) error { servers: make(map[string]*serverUpdate), replies: make(map[string]msgHandler), sendq: newIPQueue[*pubMsg](s, "System sendQ"), + recvq: newIPQueue[*inSysMsg](s, "System recvQ"), resetCh: make(chan struct{}), sq: s.newSendQ(), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval, } - s.sys.wg.Add(1) s.mu.Unlock() @@ -1279,6 +1279,9 @@ func (s *Server) setSystemAccount(acc *Account) error { // We do our own wg here since we will stop first during shutdown. go s.internalSendLoop(&s.sys.wg) + // Start the internal loop for inbound messages. + go s.internalReceiveLoop() + // Start up our general subscriptions s.initEventTracking()