Updates based on PR feedback

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-04 16:16:28 -08:00
parent 051d0159ef
commit e70e46ea4a
3 changed files with 26 additions and 36 deletions

View File

@@ -270,8 +270,7 @@ type client struct {
trace bool
echo bool
internal bool
noIcb bool
tags jwt.TagList
nameTag string
@@ -564,7 +563,6 @@ func (c *client) initClient() {
c.subs = make(map[string]*subscription)
c.echo = true
c.internal = true
c.setTraceLevel()
@@ -3029,7 +3027,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g
client.outBytes += msgSize
// Check for internal subscriptions.
if sub.icb != nil && c.internal {
if sub.icb != nil && !c.noIcb {
if gwrply {
// Note that we keep track of the GW routed reply in the destination
// connection (`client`). The routed reply subject is in `c.pa.reply`,

View File

@@ -28,13 +28,11 @@ type outMsg struct {
}
type sendq struct {
mu sync.Mutex
mch chan struct{}
head *outMsg
tail *outMsg
s *Server
msgs int
bytes int
mu sync.Mutex
mch chan struct{}
head *outMsg
tail *outMsg
s *Server
}
func (s *Server) newSendQ() *sendq {
@@ -52,7 +50,7 @@ func (sq *sendq) internalLoop() {
c := s.createInternalSystemClient()
c.registerWithAccount(s.SystemAccount())
c.internal = false
c.noIcb = true
defer c.closeConnection(ClientClosed)
@@ -95,12 +93,11 @@ func (sq *sendq) pending() *outMsg {
sq.mu.Lock()
head := sq.head
sq.head, sq.tail = nil, nil
sq.msgs, sq.bytes = 0, 0
sq.mu.Unlock()
return head
}
func (sq *sendq) send(subj, rply string, hdr, msg []byte) (int, int) {
func (sq *sendq) send(subj, rply string, hdr, msg []byte) {
out := &outMsg{subj, rply, nil, nil, nil}
// We will copy these for now.
if len(hdr) > 0 {
@@ -113,22 +110,20 @@ func (sq *sendq) send(subj, rply string, hdr, msg []byte) (int, int) {
}
sq.mu.Lock()
sq.msgs++
sq.bytes += len(subj) + len(rply) + len(hdr) + len(msg)
var doKick bool
if sq.head == nil {
sq.head = out
doKick = true
} else {
sq.tail.next = out
}
sq.tail = out
msgs, bytes := sq.msgs, sq.bytes
sq.mu.Unlock()
select {
case sq.mch <- struct{}{}:
default:
if doKick {
select {
case sq.mch <- struct{}{}:
default:
}
}
return msgs, bytes
}

View File

@@ -1924,17 +1924,14 @@ type inMsg struct {
// Linked list for inbound messages.
type inbound struct {
head *inMsg
tail *inMsg
msgs int
bytes int
head *inMsg
tail *inMsg
}
func (mset *stream) pending() *inMsg {
mset.mu.Lock()
head := mset.msgs.head
mset.msgs.head, mset.msgs.tail = nil, nil
mset.msgs.msgs, mset.msgs.bytes = 0, 0
mset.mu.Unlock()
return head
}
@@ -1952,19 +1949,21 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
}
mset.mu.Lock()
var doKick bool
if mset.msgs.head == nil {
mset.msgs.head = m
doKick = true
} else {
mset.msgs.tail.next = m
}
mset.msgs.tail = m
mset.msgs.msgs++
mset.msgs.bytes += len(subj) + len(rply) + len(hdr) + len(msg)
mset.mu.Unlock()
select {
case mset.mch <- struct{}{}:
default:
if doKick {
select {
case mset.mch <- struct{}{}:
default:
}
}
}
@@ -2364,9 +2363,7 @@ func (mset *stream) internalLoop() {
// Do this here to nil out below vs up in for loop.
next := im.next
im.next, im.hdr, im.msg = nil, nil, nil
if im = next; im == nil {
im = mset.pending()
}
im = next
}
case <-s.quitCh:
return