Merge pull request #3040 from nats-io/js_limit_catchup_msgs_count

Add catchup messages limit that was removed
This commit is contained in:
Ivan Kozlovic
2022-04-15 14:36:04 -06:00
committed by GitHub

View File

@@ -6336,7 +6336,9 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
defer s.grWG.Done()
const maxOutBytes = int64(32 * 1024 * 1024) // 32MB for now, these are all internal, from server to server
const maxOutMsgs = int32(128 * 1024)
outb := int64(0)
outm := int32(0)
// On abnormal exit make sure to update global total.
defer s.gcbSubLast(&outb)
@@ -6357,6 +6359,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
ackSub, _ := s.sysSubscribe(ackReply, func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
sz := ackReplySize(subject)
s.gcbSub(&outb, sz)
atomic.AddInt32(&outm, -1)
mset.updateCatchupPeer(sreq.Peer)
// Kick ourselves and anyone else who might have stalled on global state.
select {
@@ -6381,7 +6384,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
notActive.Reset(activityInterval)
var smv StoreMsg
for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && s.gcbTotal() <= maxTotalCatchupOutBytes; seq++ {
for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbTotal() <= maxTotalCatchupOutBytes; seq++ {
sm, err := mset.store.LoadMsg(seq, &smv)
// if this is not a deleted msg, bail out.
if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg {
@@ -6400,6 +6403,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
l := int64(len(em))
reply := fmt.Sprintf(ackReplyT, l)
s.gcbAdd(&outb, l)
atomic.AddInt32(&outm, 1)
s.sendInternalMsgLocked(sendSubject, reply, nil, em)
if seq == last {
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())