mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Better accounting for max-bytes for pull consumers
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3070,10 +3070,12 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
|
||||
// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
|
||||
for {
|
||||
var (
|
||||
pmsg *jsPubMsg
|
||||
dc uint64
|
||||
dsubj string
|
||||
delay time.Duration
|
||||
pmsg *jsPubMsg
|
||||
dc uint64
|
||||
dsubj string
|
||||
ackReply string
|
||||
delay time.Duration
|
||||
sz int
|
||||
)
|
||||
o.mu.Lock()
|
||||
// consumer is closed when mset is set to nil.
|
||||
@@ -3112,9 +3114,25 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// Update our cached num pending here first.
|
||||
if dc == 1 && o.npcm > 0 {
|
||||
o.npc--
|
||||
}
|
||||
// Pre-calculate ackReply
|
||||
ackReply = o.ackReply(pmsg.seq, o.dseq, dc, pmsg.ts, o.numPending())
|
||||
|
||||
// If headers only do not send msg payload.
|
||||
// Add in msg size itself as header.
|
||||
if o.cfg.HeadersOnly {
|
||||
convertToHeadersOnly(pmsg)
|
||||
}
|
||||
// Calculate payload size. This can be calculated on client side.
|
||||
// We do not include transport subject here since not generally known on client.
|
||||
sz = len(pmsg.subj) + +len(ackReply) + len(pmsg.hdr) + len(pmsg.msg)
|
||||
|
||||
if o.isPushMode() {
|
||||
dsubj = o.dsubj
|
||||
} else if wr := o.nextWaiting(len(pmsg.hdr) + len(pmsg.msg)); wr != nil {
|
||||
} else if wr := o.nextWaiting(sz); wr != nil {
|
||||
dsubj = wr.reply
|
||||
if done := wr.recycleIfDone(); done && o.node != nil {
|
||||
o.removeClusterPendingRequest(dsubj)
|
||||
@@ -3124,6 +3142,9 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
|
||||
} else {
|
||||
// We will redo this one.
|
||||
o.sseq--
|
||||
if dc == 1 && o.npcm > 0 {
|
||||
o.npc++
|
||||
}
|
||||
pmsg.returnToPool()
|
||||
goto waitForMsgs
|
||||
}
|
||||
@@ -3147,8 +3168,8 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
|
||||
|
||||
// If we have a rate limit set make sure we check that here.
|
||||
if o.rlimit != nil {
|
||||
now, sm := time.Now(), &pmsg.StoreMsg
|
||||
r := o.rlimit.ReserveN(now, len(sm.msg)+len(sm.hdr)+len(sm.subj)+len(dsubj)+len(o.ackReplyT))
|
||||
now := time.Now()
|
||||
r := o.rlimit.ReserveN(now, sz)
|
||||
delay := r.DelayFrom(now)
|
||||
if delay > 0 {
|
||||
o.mu.Unlock()
|
||||
@@ -3163,7 +3184,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
|
||||
}
|
||||
|
||||
// Do actual delivery.
|
||||
o.deliverMsg(dsubj, pmsg, dc, rp)
|
||||
o.deliverMsg(dsubj, ackReply, pmsg, dc, rp)
|
||||
|
||||
// Reset our idle heartbeat timer if set.
|
||||
if hb != nil {
|
||||
@@ -3283,49 +3304,44 @@ func (o *consumer) streamNumPending() uint64 {
|
||||
return o.npc
|
||||
}
|
||||
|
||||
func convertToHeadersOnly(pmsg *jsPubMsg) {
|
||||
// If headers only do not send msg payload.
|
||||
// Add in msg size itself as header.
|
||||
hdr, msg := pmsg.hdr, pmsg.msg
|
||||
var bb bytes.Buffer
|
||||
if len(hdr) == 0 {
|
||||
bb.WriteString(hdrLine)
|
||||
} else {
|
||||
bb.Write(hdr)
|
||||
bb.Truncate(len(hdr) - LEN_CR_LF)
|
||||
}
|
||||
bb.WriteString(JSMsgSize)
|
||||
bb.WriteString(": ")
|
||||
bb.WriteString(strconv.FormatInt(int64(len(msg)), 10))
|
||||
bb.WriteString(CR_LF)
|
||||
bb.WriteString(CR_LF)
|
||||
// Replace underlying buf which we can use directly when we send.
|
||||
// TODO(dlc) - Probably just use directly when forming bytes.Buffer?
|
||||
pmsg.buf = pmsg.buf[:0]
|
||||
pmsg.buf = append(pmsg.buf, bb.Bytes()...)
|
||||
// Replace with new header.
|
||||
pmsg.hdr = pmsg.buf
|
||||
// Cancel msg payload
|
||||
pmsg.msg = nil
|
||||
}
|
||||
|
||||
// Deliver a msg to the consumer.
|
||||
// Lock should be held and o.mset validated to be non-nil.
|
||||
func (o *consumer) deliverMsg(dsubj string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy) {
|
||||
func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy) {
|
||||
if o.mset == nil {
|
||||
pmsg.returnToPool()
|
||||
return
|
||||
}
|
||||
|
||||
// Update our cached num pending.
|
||||
if dc == 1 && o.npcm > 0 {
|
||||
o.npc--
|
||||
}
|
||||
|
||||
dseq := o.dseq
|
||||
o.dseq++
|
||||
|
||||
// If headers only do not send msg payload.
|
||||
// Add in msg size itself as header.
|
||||
if o.cfg.HeadersOnly {
|
||||
hdr, msg := pmsg.hdr, pmsg.msg
|
||||
var bb bytes.Buffer
|
||||
if len(hdr) == 0 {
|
||||
bb.WriteString(hdrLine)
|
||||
} else {
|
||||
bb.Write(hdr)
|
||||
bb.Truncate(len(hdr) - LEN_CR_LF)
|
||||
}
|
||||
bb.WriteString(JSMsgSize)
|
||||
bb.WriteString(": ")
|
||||
bb.WriteString(strconv.FormatInt(int64(len(msg)), 10))
|
||||
bb.WriteString(CR_LF)
|
||||
bb.WriteString(CR_LF)
|
||||
// Replace underlying buf which we can use directly when we send.
|
||||
// TODO(dlc) - Probably just use directly when forming bytes.Buffer?
|
||||
pmsg.buf = pmsg.buf[:0]
|
||||
pmsg.buf = append(pmsg.buf, bb.Bytes()...)
|
||||
// Replace with new header.
|
||||
pmsg.hdr = pmsg.buf
|
||||
// Cancel msg payload
|
||||
pmsg.msg = nil
|
||||
}
|
||||
|
||||
pmsg.dsubj, pmsg.reply, pmsg.o = dsubj, o.ackReply(pmsg.seq, dseq, dc, pmsg.ts, o.numPending()), o
|
||||
pmsg.dsubj, pmsg.reply, pmsg.o = dsubj, ackReply, o
|
||||
psz := pmsg.size()
|
||||
|
||||
if o.maxpb > 0 {
|
||||
|
||||
@@ -17327,8 +17327,8 @@ func TestJetStreamPullMaxBytes(t *testing.T) {
|
||||
require_NoError(t, err)
|
||||
|
||||
// Put in ~2MB, each ~100k
|
||||
msz := 99_980
|
||||
total, msg := 20, []byte(strings.Repeat("Z", msz))
|
||||
msz, dsz := 100_000, 99_950
|
||||
total, msg := 20, []byte(strings.Repeat("Z", dsz))
|
||||
|
||||
for i := 0; i < total; i++ {
|
||||
if _, err := js.Publish("TEST", msg); err != nil {
|
||||
@@ -17377,7 +17377,7 @@ func TestJetStreamPullMaxBytes(t *testing.T) {
|
||||
|
||||
m, err = sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
require_True(t, len(m.Data) == msz)
|
||||
require_True(t, len(m.Data) == dsz)
|
||||
require_True(t, len(m.Header) == 0)
|
||||
checkSubsPending(t, sub, 0)
|
||||
|
||||
@@ -17389,33 +17389,33 @@ func TestJetStreamPullMaxBytes(t *testing.T) {
|
||||
for i := 0; i < 5; i++ {
|
||||
m, err = sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
require_True(t, len(m.Data) == msz)
|
||||
require_True(t, len(m.Data) == dsz)
|
||||
require_True(t, len(m.Header) == 0)
|
||||
}
|
||||
checkSubsPending(t, sub, 0)
|
||||
|
||||
// Now ask for large batch but make sure we are limited by batch size.
|
||||
req = &JSApiConsumerGetNextRequest{Batch: 1_000, MaxBytes: msz * 5, NoWait: true}
|
||||
req = &JSApiConsumerGetNextRequest{Batch: 1_000, MaxBytes: msz * 4, NoWait: true}
|
||||
jreq, _ = json.Marshal(req)
|
||||
nc.PublishRequest(subj, reply, jreq)
|
||||
checkSubsPending(t, sub, 5)
|
||||
for i := 0; i < 5; i++ {
|
||||
m, err = sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
require_True(t, len(m.Data) == msz)
|
||||
require_True(t, len(m.Data) == dsz)
|
||||
require_True(t, len(m.Header) == 0)
|
||||
}
|
||||
checkSubsPending(t, sub, 0)
|
||||
|
||||
req = &JSApiConsumerGetNextRequest{Batch: 1_000, MaxBytes: msz + 20, NoWait: true}
|
||||
req = &JSApiConsumerGetNextRequest{Batch: 1_000, MaxBytes: msz, NoWait: true}
|
||||
jreq, _ = json.Marshal(req)
|
||||
nc.PublishRequest(subj, reply, jreq)
|
||||
checkSubsPending(t, sub, 1)
|
||||
checkSubsPending(t, sub, 2)
|
||||
m, err = sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
require_True(t, len(m.Data) == msz)
|
||||
require_True(t, len(m.Data) == dsz)
|
||||
require_True(t, len(m.Header) == 0)
|
||||
checkSubsPending(t, sub, 0)
|
||||
checkSubsPending(t, sub, 1)
|
||||
}
|
||||
|
||||
func TestJetStreamStreamRepublishCycle(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user