Files
nats-server/server/sendq.go
2021-07-01 14:29:09 -07:00

129 lines
2.6 KiB
Go

// Copyright 2020-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"strconv"
"sync"
)
type outMsg struct {
subj string
rply string
hdr []byte
msg []byte
next *outMsg
}
type sendq struct {
mu sync.Mutex
mch chan struct{}
head *outMsg
tail *outMsg
s *Server
}
func (s *Server) newSendQ() *sendq {
sq := &sendq{s: s, mch: make(chan struct{}, 1)}
s.startGoRoutine(sq.internalLoop)
return sq
}
func (sq *sendq) internalLoop() {
sq.mu.Lock()
s, mch := sq.s, sq.mch
sq.mu.Unlock()
defer s.grWG.Done()
c := s.createInternalSystemClient()
c.registerWithAccount(s.SystemAccount())
c.noIcb = true
defer c.closeConnection(ClientClosed)
for s.isRunning() {
select {
case <-s.quitCh:
return
case <-mch:
for pm := sq.pending(); pm != nil; {
c.pa.subject = []byte(pm.subj)
c.pa.size = len(pm.msg) + len(pm.hdr)
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
c.pa.reply = []byte(pm.rply)
var msg []byte
if len(pm.hdr) > 0 {
c.pa.hdr = len(pm.hdr)
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
msg = append(pm.hdr, pm.msg...)
msg = append(msg, _CRLF_...)
} else {
c.pa.hdr = -1
c.pa.hdb = nil
msg = append(pm.msg, _CRLF_...)
}
c.processInboundClientMsg(msg)
c.pa.szb = nil
// Do this here to nil out below vs up in for loop.
next := pm.next
pm.next, pm.hdr, pm.msg = nil, nil, nil
if pm = next; pm == nil {
pm = sq.pending()
}
}
c.flushClients(0)
}
}
}
func (sq *sendq) pending() *outMsg {
sq.mu.Lock()
head := sq.head
sq.head, sq.tail = nil, nil
sq.mu.Unlock()
return head
}
func (sq *sendq) send(subj, rply string, hdr, msg []byte) {
out := &outMsg{subj, rply, nil, nil, nil}
// We will copy these for now.
if len(hdr) > 0 {
hdr = append(hdr[:0:0], hdr...)
out.hdr = hdr
}
if len(msg) > 0 {
msg = append(msg[:0:0], msg...)
out.msg = msg
}
sq.mu.Lock()
var notify bool
if sq.head == nil {
sq.head = out
notify = true
} else {
sq.tail.next = out
}
sq.tail = out
sq.mu.Unlock()
if notify {
select {
case sq.mch <- struct{}{}:
default:
}
}
}