mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 18:20:42 -07:00
Consumers would process next message batches inline.
In clustered mode this could block a route or gateway processor. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1677,35 +1677,22 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string
|
||||
// In case we have to queue up this request. This is all on stack pre-allocated.
|
||||
wr := waitingRequest{client: c, reply: reply, n: batchSize, noWait: noWait, expires: expires}
|
||||
|
||||
// If we are in replay mode, defer to processReplay for delivery.
|
||||
if o.replay {
|
||||
o.waiting.add(&wr)
|
||||
if wr.noWait {
|
||||
if o.maxp > 0 && len(o.pending) >= o.maxp {
|
||||
sendErr(409, "Exceeded MaxAckPending")
|
||||
}
|
||||
o.mu.Unlock()
|
||||
o.signalNewMessages()
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < batchSize; i++ {
|
||||
// See if we have more messages available.
|
||||
if subj, hdr, msg, seq, dc, ts, err := o.getNextMsg(); err == nil {
|
||||
o.deliverMsg(reply, subj, hdr, msg, seq, dc, ts)
|
||||
// Need to discount this from the total n for the request.
|
||||
wr.n--
|
||||
} else {
|
||||
if wr.noWait {
|
||||
switch err {
|
||||
case errMaxAckPending:
|
||||
sendErr(409, "Exceeded MaxAckPending")
|
||||
default:
|
||||
sendErr(404, "No Messages")
|
||||
}
|
||||
return
|
||||
}
|
||||
o.waiting.add(&wr)
|
||||
break
|
||||
empty := mset.state().Msgs == 0
|
||||
o.mu.Lock()
|
||||
if empty {
|
||||
sendErr(404, "No Messages")
|
||||
}
|
||||
}
|
||||
|
||||
o.waiting.add(&wr)
|
||||
o.mu.Unlock()
|
||||
|
||||
o.signalNewMessages()
|
||||
}
|
||||
|
||||
// Increase the delivery count for this message.
|
||||
|
||||
Reference in New Issue
Block a user