diff --git a/server/consumer.go b/server/consumer.go index 3f4f9103..0bac6a9b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -254,6 +254,7 @@ type consumer struct { ackReplyT string ackSubj string nextMsgSubj string + nextMsgReqs *ipQueue[*nextMsgReq] maxp int pblimit int maxpb int @@ -750,6 +751,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // Create our request waiting queue. if o.isPullMode() { o.waiting = newWaitQueue(config.MaxWaiting) + // Create our internal queue for next msg requests. + o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, mset.cfg.Name)) } // Check if we have filtered subject that is a wildcard. @@ -1099,6 +1102,7 @@ func (o *consumer) setLeader(isLeader bool) { if node != nil && o.pch == nil { o.pch = make(chan struct{}, 1) } + pullMode := o.isPullMode() o.mu.Unlock() // Snapshot initial info. @@ -1110,6 +1114,12 @@ func (o *consumer) setLeader(isLeader bool) { // Now start up Go routine to process acks. go o.processInboundAcks(qch) + if pullMode { + // Now start up Go routine to process inbound next message requests. + go o.processInboundNextMsgReqs(qch) + + } + // If we are R>1 spin up our proposal loop. if node != nil { // Determine if we can send pending requests info to the group. @@ -1145,6 +1155,7 @@ func (o *consumer) setLeader(isLeader bool) { if !o.isDurable() { stopAndClearTimer(&o.dtmr) } + o.nextMsgReqs.drain() } else if o.srv.gateway.enabled { stopAndClearTimer(&o.gwdtmr) } @@ -2833,6 +2844,37 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest { return nil } +// Next message request. +type nextMsgReq struct { + reply string + msg []byte +} + +var nextMsgReqPool sync.Pool + +func newNextMsgReq(reply string, msg []byte) *nextMsgReq { + var nmr *nextMsgReq + m := nextMsgReqPool.Get() + if m != nil { + nmr = m.(*nextMsgReq) + } else { + nmr = &nextMsgReq{} + } + // When getting something from a pool it is criticical that all fields are + // initialized. Doing this way guarantees that if someone adds a field to + // the structure, the compiler will fail the build if this line is not updated. + (*nmr) = nextMsgReq{reply, msg} + return nmr +} + +func (nmr *nextMsgReq) returnToPool() { + if nmr == nil { + return + } + nmr.reply, nmr.msg = _EMPTY_, nil + nextMsgReqPool.Put(nmr) +} + // processNextMsgReq will process a request for the next message available. A nil message payload means deliver // a single message. If the payload is a formal request or a number parseable with Atoi(), then we will send a // batch of messages without requiring another request to this endpoint, or an ACK. @@ -2841,20 +2883,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _, return } _, msg = c.msgParts(msg) - - inlineOk := c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF - if !inlineOk { - // Check how long we have been away from the readloop for the route or gateway or leafnode. - // If too long move to a separate go routine. - if elapsed := time.Since(c.in.start); elapsed < noBlockThresh { - inlineOk = true - } - } - if inlineOk { - o.processNextMsgRequest(reply, msg) - } else { - go o.processNextMsgRequest(reply, copyBytes(msg)) - } + o.nextMsgReqs.push(newNextMsgReq(reply, copyBytes(msg))) } func (o *consumer) processNextMsgRequest(reply string, msg []byte) { @@ -3224,6 +3253,30 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { } } +// Process inbound next message requests. +func (o *consumer) processInboundNextMsgReqs(qch chan struct{}) { + // Grab the server lock to watch for server quit. + o.mu.RLock() + s := o.srv + o.mu.RUnlock() + + for { + select { + case <-o.nextMsgReqs.ch: + reqs := o.nextMsgReqs.pop() + for _, req := range reqs { + o.processNextMsgRequest(req.reply, req.msg) + req.returnToPool() + } + o.nextMsgReqs.recycle(&reqs) + case <-qch: + return + case <-s.quitCh: + return + } + } +} + // Suppress auto cleanup on ack activity of any kind. func (o *consumer) suppressDeletion() { o.mu.Lock() @@ -4310,6 +4363,9 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { n := o.node qgroup := o.cfg.DeliverGroup o.ackMsgs.unregister() + if o.nextMsgReqs != nil { + o.nextMsgReqs.unregister() + } // For cleaning up the node assignment. var ca *consumerAssignment diff --git a/server/norace_test.go b/server/norace_test.go index 06da28f8..deb5812c 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -6559,7 +6559,7 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) { setHighStartSequence := false simulateMaxRedeliveries := false maxBadPubTimes := uint32(20) - badPubThresh := 5 * time.Second + badPubThresh := 500 * time.Millisecond testTime := 5 * time.Minute // make sure to do --timeout=65m t.Logf("Starting Test: Total Test Time %v", testTime)