Serialize outbound messages

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-11-29 13:59:45 -08:00
parent 16e9bbaf4c
commit 4172e9b962
2 changed files with 46 additions and 15 deletions

View File

@@ -66,28 +66,51 @@ type DataStats struct {
Bytes int64 `json:"bytes"`
}
// This will send a message.
// TODO(dlc) - Note we want the sequence numbers to be serialized but right now they may not be.
func (s *Server) sendInternalMsg(r *SublistResult, subj string, msg []byte) {
// Used for internally queueing up messages that the server wants to send.
type pubMsg struct {
r *SublistResult
sub string
msg []byte
}
func (s *Server) internalSendLoop() {
defer s.grWG.Done()
s.mu.Lock()
if s.sys == nil {
return
}
c := s.sys.client
acc := s.sys.account
sendq := s.sys.sendq
s.mu.Unlock()
// Prep internl structures needed to send message.
c.pa.subject = []byte(subj)
c.pa.size = len(msg)
c.pa.szb = []byte(strconv.FormatInt(int64(len(msg)), 10))
// Add in NL
msg = append(msg, _CRLF_...)
// Check to see if we need to map/route to another account.
if acc.imports.services != nil {
c.checkForImportServices(acc, msg)
for s.isRunning() {
select {
case pm := <-sendq:
// Prep internal structures needed to send message.
c.pa.subject = []byte(pm.sub)
c.pa.size = len(pm.msg)
c.pa.szb = []byte(strconv.FormatInt(int64(len(pm.msg)), 10))
// Add in NL
pm.msg = append(pm.msg, _CRLF_...)
// Check to see if we need to map/route to another account.
if acc.imports.services != nil {
c.checkForImportServices(acc, pm.msg)
}
c.processMsgResults(acc, pm.r, pm.msg, []byte(pm.sub), nil, nil)
c.flushClients()
case <-s.quitCh:
return
}
}
c.processMsgResults(acc, r, msg, []byte(subj), nil, nil)
c.flushClients()
}
// This will queue up a message to be sent.
func (s *Server) sendInternalMsg(r *SublistResult, sub string, msg []byte) {
if s.sys == nil {
return
}
s.sys.sendq <- &pubMsg{r, sub, msg}
}
// accountConnectEvent will send an account client connect event if there is interest.

View File

@@ -84,6 +84,7 @@ type internal struct {
seq uint64
sid uint64
subs map[string]msgHandler
sendq chan *pubMsg
}
// Server is our main struct.
@@ -514,12 +515,19 @@ func (s *Server) setSystemAccount(acc *Account) error {
seq: 1,
sid: 1,
subs: make(map[string]msgHandler, 8),
sendq: make(chan *pubMsg, 128),
}
s.sys.client.initClient()
s.sys.client.echo = false
s.mu.Unlock()
// Register with the account.
s.sys.client.registerWithAccount(acc)
// Start our internal loop to serialize outbound messages.
s.startGoRoutine(func() {
s.internalSendLoop()
})
return nil
}