mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
@@ -23,26 +23,23 @@ type outMsg struct {
|
||||
rply string
|
||||
hdr []byte
|
||||
msg []byte
|
||||
next *outMsg
|
||||
}
|
||||
|
||||
type sendq struct {
|
||||
mu sync.Mutex
|
||||
mch chan struct{}
|
||||
head *outMsg
|
||||
tail *outMsg
|
||||
s *Server
|
||||
mu sync.Mutex
|
||||
q *ipQueue // of *outMsg
|
||||
s *Server
|
||||
}
|
||||
|
||||
func (s *Server) newSendQ() *sendq {
|
||||
sq := &sendq{s: s, mch: make(chan struct{}, 1)}
|
||||
sq := &sendq{s: s, q: newIPQueue()}
|
||||
s.startGoRoutine(sq.internalLoop)
|
||||
return sq
|
||||
}
|
||||
|
||||
func (sq *sendq) internalLoop() {
|
||||
sq.mu.Lock()
|
||||
s, mch := sq.s, sq.mch
|
||||
s, q := sq.s, sq.q
|
||||
sq.mu.Unlock()
|
||||
|
||||
defer s.grWG.Done()
|
||||
@@ -57,8 +54,10 @@ func (sq *sendq) internalLoop() {
|
||||
select {
|
||||
case <-s.quitCh:
|
||||
return
|
||||
case <-mch:
|
||||
for pm := sq.pending(); pm != nil; {
|
||||
case <-q.ch:
|
||||
pms := q.pop()
|
||||
for _, pmi := range pms {
|
||||
pm := pmi.(*outMsg)
|
||||
c.pa.subject = []byte(pm.subj)
|
||||
c.pa.size = len(pm.msg) + len(pm.hdr)
|
||||
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
|
||||
@@ -76,28 +75,16 @@ func (sq *sendq) internalLoop() {
|
||||
}
|
||||
c.processInboundClientMsg(msg)
|
||||
c.pa.szb = nil
|
||||
// Do this here to nil out below vs up in for loop.
|
||||
next := pm.next
|
||||
pm.next, pm.hdr, pm.msg = nil, nil, nil
|
||||
if pm = next; pm == nil {
|
||||
pm = sq.pending()
|
||||
}
|
||||
}
|
||||
// TODO: should this be in the for-loop instead?
|
||||
c.flushClients(0)
|
||||
q.recycle(&pms)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sq *sendq) pending() *outMsg {
|
||||
sq.mu.Lock()
|
||||
head := sq.head
|
||||
sq.head, sq.tail = nil, nil
|
||||
sq.mu.Unlock()
|
||||
return head
|
||||
}
|
||||
|
||||
func (sq *sendq) send(subj, rply string, hdr, msg []byte) {
|
||||
out := &outMsg{subj, rply, nil, nil, nil}
|
||||
out := &outMsg{subj, rply, nil, nil}
|
||||
// We will copy these for now.
|
||||
if len(hdr) > 0 {
|
||||
hdr = copyBytes(hdr)
|
||||
@@ -107,22 +94,5 @@ func (sq *sendq) send(subj, rply string, hdr, msg []byte) {
|
||||
msg = copyBytes(msg)
|
||||
out.msg = msg
|
||||
}
|
||||
|
||||
sq.mu.Lock()
|
||||
var notify bool
|
||||
if sq.head == nil {
|
||||
sq.head = out
|
||||
notify = true
|
||||
} else {
|
||||
sq.tail.next = out
|
||||
}
|
||||
sq.tail = out
|
||||
sq.mu.Unlock()
|
||||
|
||||
if notify {
|
||||
select {
|
||||
case sq.mch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
sq.q.push(out)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user