diff --git a/server/sendq.go b/server/sendq.go index 12413eac..0b358547 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -23,26 +23,23 @@ type outMsg struct { rply string hdr []byte msg []byte - next *outMsg } type sendq struct { - mu sync.Mutex - mch chan struct{} - head *outMsg - tail *outMsg - s *Server + mu sync.Mutex + q *ipQueue // of *outMsg + s *Server } func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, mch: make(chan struct{}, 1)} + sq := &sendq{s: s, q: newIPQueue()} s.startGoRoutine(sq.internalLoop) return sq } func (sq *sendq) internalLoop() { sq.mu.Lock() - s, mch := sq.s, sq.mch + s, q := sq.s, sq.q sq.mu.Unlock() defer s.grWG.Done() @@ -57,8 +54,10 @@ func (sq *sendq) internalLoop() { select { case <-s.quitCh: return - case <-mch: - for pm := sq.pending(); pm != nil; { + case <-q.ch: + pms := q.pop() + for _, pmi := range pms { + pm := pmi.(*outMsg) c.pa.subject = []byte(pm.subj) c.pa.size = len(pm.msg) + len(pm.hdr) c.pa.szb = []byte(strconv.Itoa(c.pa.size)) @@ -76,28 +75,16 @@ func (sq *sendq) internalLoop() { } c.processInboundClientMsg(msg) c.pa.szb = nil - // Do this here to nil out below vs up in for loop. - next := pm.next - pm.next, pm.hdr, pm.msg = nil, nil, nil - if pm = next; pm == nil { - pm = sq.pending() - } } + // TODO: should this be in the for-loop instead? c.flushClients(0) + q.recycle(&pms) } } } -func (sq *sendq) pending() *outMsg { - sq.mu.Lock() - head := sq.head - sq.head, sq.tail = nil, nil - sq.mu.Unlock() - return head -} - func (sq *sendq) send(subj, rply string, hdr, msg []byte) { - out := &outMsg{subj, rply, nil, nil, nil} + out := &outMsg{subj, rply, nil, nil} // We will copy these for now. if len(hdr) > 0 { hdr = copyBytes(hdr) @@ -107,22 +94,5 @@ func (sq *sendq) send(subj, rply string, hdr, msg []byte) { msg = copyBytes(msg) out.msg = msg } - - sq.mu.Lock() - var notify bool - if sq.head == nil { - sq.head = out - notify = true - } else { - sq.tail.next = out - } - sq.tail = out - sq.mu.Unlock() - - if notify { - select { - case sq.mch <- struct{}{}: - default: - } - } + sq.q.push(out) }