diff --git a/server/consumer.go b/server/consumer.go index af852fe6..9da1360b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1677,35 +1677,22 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _, reply string // In case we have to queue up this request. This is all on stack pre-allocated. wr := waitingRequest{client: c, reply: reply, n: batchSize, noWait: noWait, expires: expires} - // If we are in replay mode, defer to processReplay for delivery. - if o.replay { - o.waiting.add(&wr) + if wr.noWait { + if o.maxp > 0 && len(o.pending) >= o.maxp { + sendErr(409, "Exceeded MaxAckPending") + } o.mu.Unlock() - o.signalNewMessages() - return - } - - for i := 0; i < batchSize; i++ { - // See if we have more messages available. - if subj, hdr, msg, seq, dc, ts, err := o.getNextMsg(); err == nil { - o.deliverMsg(reply, subj, hdr, msg, seq, dc, ts) - // Need to discount this from the total n for the request. - wr.n-- - } else { - if wr.noWait { - switch err { - case errMaxAckPending: - sendErr(409, "Exceeded MaxAckPending") - default: - sendErr(404, "No Messages") - } - return - } - o.waiting.add(&wr) - break + empty := mset.state().Msgs == 0 + o.mu.Lock() + if empty { + sendErr(404, "No Messages") } } + + o.waiting.add(&wr) o.mu.Unlock() + + o.signalNewMessages() } // Increase the delivery count for this message.