mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
130 lines
2.6 KiB
Go
130 lines
2.6 KiB
Go
// Copyright 2020-2021 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type outMsg struct {
|
|
subj string
|
|
rply string
|
|
hdr []byte
|
|
msg []byte
|
|
next *outMsg
|
|
}
|
|
|
|
type sendq struct {
|
|
mu sync.Mutex
|
|
mch chan struct{}
|
|
head *outMsg
|
|
tail *outMsg
|
|
s *Server
|
|
}
|
|
|
|
func (s *Server) newSendQ() *sendq {
|
|
sq := &sendq{s: s, mch: make(chan struct{}, 1)}
|
|
s.startGoRoutine(sq.internalLoop)
|
|
return sq
|
|
}
|
|
|
|
func (sq *sendq) internalLoop() {
|
|
sq.mu.Lock()
|
|
s, mch := sq.s, sq.mch
|
|
sq.mu.Unlock()
|
|
|
|
defer s.grWG.Done()
|
|
|
|
c := s.createInternalSystemClient()
|
|
c.registerWithAccount(s.SystemAccount())
|
|
c.noIcb = true
|
|
|
|
defer c.closeConnection(ClientClosed)
|
|
|
|
for s.isRunning() {
|
|
select {
|
|
case <-s.quitCh:
|
|
return
|
|
case <-mch:
|
|
for pm := sq.pending(); pm != nil; {
|
|
c.pa.subject = []byte(pm.subj)
|
|
c.pa.size = len(pm.msg) + len(pm.hdr)
|
|
c.pa.szb = []byte(strconv.Itoa(c.pa.size))
|
|
c.pa.reply = []byte(pm.rply)
|
|
var msg []byte
|
|
if len(pm.hdr) > 0 {
|
|
c.pa.hdr = len(pm.hdr)
|
|
c.pa.hdb = []byte(strconv.Itoa(c.pa.hdr))
|
|
msg = append(pm.hdr, pm.msg...)
|
|
msg = append(msg, _CRLF_...)
|
|
} else {
|
|
c.pa.hdr = -1
|
|
c.pa.hdb = nil
|
|
msg = append(pm.msg, _CRLF_...)
|
|
}
|
|
c.processInboundClientMsg(msg)
|
|
c.pa.szb = nil
|
|
// Do this here to nil out below vs up in for loop.
|
|
next := pm.next
|
|
pm.next, pm.hdr, pm.msg = nil, nil, nil
|
|
if pm = next; pm == nil {
|
|
pm = sq.pending()
|
|
}
|
|
}
|
|
c.flushClients(10 * time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sq *sendq) pending() *outMsg {
|
|
sq.mu.Lock()
|
|
head := sq.head
|
|
sq.head, sq.tail = nil, nil
|
|
sq.mu.Unlock()
|
|
return head
|
|
}
|
|
|
|
func (sq *sendq) send(subj, rply string, hdr, msg []byte) {
|
|
out := &outMsg{subj, rply, nil, nil, nil}
|
|
// We will copy these for now.
|
|
if len(hdr) > 0 {
|
|
hdr = append(hdr[:0:0], hdr...)
|
|
out.hdr = hdr
|
|
}
|
|
if len(msg) > 0 {
|
|
msg = append(msg[:0:0], msg...)
|
|
out.msg = msg
|
|
}
|
|
|
|
sq.mu.Lock()
|
|
var notify bool
|
|
if sq.head == nil {
|
|
sq.head = out
|
|
notify = true
|
|
} else {
|
|
sq.tail.next = out
|
|
}
|
|
sq.tail = out
|
|
sq.mu.Unlock()
|
|
|
|
if notify {
|
|
select {
|
|
case sq.mch <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|