Make sure to never process next message requests inline

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-03 20:47:56 -07:00
parent c9350a2d57
commit 07b34f707f
2 changed files with 71 additions and 15 deletions

View File

@@ -254,6 +254,7 @@ type consumer struct {
ackReplyT string
ackSubj string
nextMsgSubj string
nextMsgReqs *ipQueue[*nextMsgReq]
maxp int
pblimit int
maxpb int
@@ -750,6 +751,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Create our request waiting queue.
if o.isPullMode() {
o.waiting = newWaitQueue(config.MaxWaiting)
// Create our internal queue for next msg requests.
o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, mset.cfg.Name))
}
// Check if we have filtered subject that is a wildcard.
@@ -1099,6 +1102,7 @@ func (o *consumer) setLeader(isLeader bool) {
if node != nil && o.pch == nil {
o.pch = make(chan struct{}, 1)
}
pullMode := o.isPullMode()
o.mu.Unlock()
// Snapshot initial info.
@@ -1110,6 +1114,12 @@ func (o *consumer) setLeader(isLeader bool) {
// Now start up Go routine to process acks.
go o.processInboundAcks(qch)
if pullMode {
// Now start up Go routine to process inbound next message requests.
go o.processInboundNextMsgReqs(qch)
}
// If we are R>1 spin up our proposal loop.
if node != nil {
// Determine if we can send pending requests info to the group.
@@ -1145,6 +1155,7 @@ func (o *consumer) setLeader(isLeader bool) {
if !o.isDurable() {
stopAndClearTimer(&o.dtmr)
}
o.nextMsgReqs.drain()
} else if o.srv.gateway.enabled {
stopAndClearTimer(&o.gwdtmr)
}
@@ -2833,6 +2844,37 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
return nil
}
// Next message request.
type nextMsgReq struct {
reply string
msg []byte
}
var nextMsgReqPool sync.Pool
func newNextMsgReq(reply string, msg []byte) *nextMsgReq {
var nmr *nextMsgReq
m := nextMsgReqPool.Get()
if m != nil {
nmr = m.(*nextMsgReq)
} else {
nmr = &nextMsgReq{}
}
// When getting something from a pool it is criticical that all fields are
// initialized. Doing this way guarantees that if someone adds a field to
// the structure, the compiler will fail the build if this line is not updated.
(*nmr) = nextMsgReq{reply, msg}
return nmr
}
func (nmr *nextMsgReq) returnToPool() {
if nmr == nil {
return
}
nmr.reply, nmr.msg = _EMPTY_, nil
nextMsgReqPool.Put(nmr)
}
// processNextMsgReq will process a request for the next message available. A nil message payload means deliver
// a single message. If the payload is a formal request or a number parseable with Atoi(), then we will send a
// batch of messages without requiring another request to this endpoint, or an ACK.
@@ -2841,20 +2883,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
return
}
_, msg = c.msgParts(msg)
inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF
if !inlineOk {
// Check how long we have been away from the readloop for the route or gateway or leafnode.
// If too long move to a separate go routine.
if elapsed := time.Since(c.in.start); elapsed < noBlockThresh {
inlineOk = true
}
}
if inlineOk {
o.processNextMsgRequest(reply, msg)
} else {
go o.processNextMsgRequest(reply, copyBytes(msg))
}
o.nextMsgReqs.push(newNextMsgReq(reply, copyBytes(msg)))
}
func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
@@ -3224,6 +3253,30 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
}
}
// Process inbound next message requests.
func (o *consumer) processInboundNextMsgReqs(qch chan struct{}) {
// Grab the server lock to watch for server quit.
o.mu.RLock()
s := o.srv
o.mu.RUnlock()
for {
select {
case <-o.nextMsgReqs.ch:
reqs := o.nextMsgReqs.pop()
for _, req := range reqs {
o.processNextMsgRequest(req.reply, req.msg)
req.returnToPool()
}
o.nextMsgReqs.recycle(&reqs)
case <-qch:
return
case <-s.quitCh:
return
}
}
}
// Suppress auto cleanup on ack activity of any kind.
func (o *consumer) suppressDeletion() {
o.mu.Lock()
@@ -4310,6 +4363,9 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
n := o.node
qgroup := o.cfg.DeliverGroup
o.ackMsgs.unregister()
if o.nextMsgReqs != nil {
o.nextMsgReqs.unregister()
}
// For cleaning up the node assignment.
var ca *consumerAssignment

View File

@@ -6559,7 +6559,7 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
setHighStartSequence := false
simulateMaxRedeliveries := false
maxBadPubTimes := uint32(20)
badPubThresh := 5 * time.Second
badPubThresh := 500 * time.Millisecond
testTime := 5 * time.Minute // make sure to do --timeout=65m
t.Logf("Starting Test: Total Test Time %v", testTime)