From 4172e9b96280a366296198579c30a10703229cda Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 29 Nov 2018 13:59:45 -0800 Subject: [PATCH] Serialize outbound messages Signed-off-by: Derek Collison --- server/events.go | 53 ++++++++++++++++++++++++++++++++++-------------- server/server.go | 8 ++++++++ 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/server/events.go b/server/events.go index 8a4aa945..e9622664 100644 --- a/server/events.go +++ b/server/events.go @@ -66,28 +66,51 @@ type DataStats struct { Bytes int64 `json:"bytes"` } -// This will send a message. -// TODO(dlc) - Note we want the sequence numbers to be serialized but right now they may not be. -func (s *Server) sendInternalMsg(r *SublistResult, subj string, msg []byte) { +// Used for internally queueing up messages that the server wants to send. +type pubMsg struct { + r *SublistResult + sub string + msg []byte +} + +func (s *Server) internalSendLoop() { + defer s.grWG.Done() + s.mu.Lock() if s.sys == nil { return } c := s.sys.client acc := s.sys.account + sendq := s.sys.sendq + s.mu.Unlock() - // Prep internl structures needed to send message. - c.pa.subject = []byte(subj) - c.pa.size = len(msg) - c.pa.szb = []byte(strconv.FormatInt(int64(len(msg)), 10)) - // Add in NL - msg = append(msg, _CRLF_...) - - // Check to see if we need to map/route to another account. - if acc.imports.services != nil { - c.checkForImportServices(acc, msg) + for s.isRunning() { + select { + case pm := <-sendq: + // Prep internal structures needed to send message. + c.pa.subject = []byte(pm.sub) + c.pa.size = len(pm.msg) + c.pa.szb = []byte(strconv.FormatInt(int64(len(pm.msg)), 10)) + // Add in NL + pm.msg = append(pm.msg, _CRLF_...) + // Check to see if we need to map/route to another account. + if acc.imports.services != nil { + c.checkForImportServices(acc, pm.msg) + } + c.processMsgResults(acc, pm.r, pm.msg, []byte(pm.sub), nil, nil) + c.flushClients() + case <-s.quitCh: + return + } } - c.processMsgResults(acc, r, msg, []byte(subj), nil, nil) - c.flushClients() +} + +// This will queue up a message to be sent. +func (s *Server) sendInternalMsg(r *SublistResult, sub string, msg []byte) { + if s.sys == nil { + return + } + s.sys.sendq <- &pubMsg{r, sub, msg} } // accountConnectEvent will send an account client connect event if there is interest. diff --git a/server/server.go b/server/server.go index 15483558..0370b8e8 100644 --- a/server/server.go +++ b/server/server.go @@ -84,6 +84,7 @@ type internal struct { seq uint64 sid uint64 subs map[string]msgHandler + sendq chan *pubMsg } // Server is our main struct. @@ -514,12 +515,19 @@ func (s *Server) setSystemAccount(acc *Account) error { seq: 1, sid: 1, subs: make(map[string]msgHandler, 8), + sendq: make(chan *pubMsg, 128), } s.sys.client.initClient() s.sys.client.echo = false s.mu.Unlock() // Register with the account. s.sys.client.registerWithAccount(acc) + + // Start our internal loop to serialize outbound messages. + s.startGoRoutine(func() { + s.internalSendLoop() + }) + return nil }