Minimize proposals doing skip msgs

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-04-16 14:19:29 -07:00
parent 946335d62f
commit 60df8a2fcb

View File

@@ -1445,13 +1445,24 @@ func (mset *stream) setupMirrorConsumer() error {
mset.store.FastState(&state)
if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream {
node, store := mset.node, mset.store
var entries []*Entry
for seq := state.LastSeq + 1; seq <= ccr.ConsumerInfo.Delivered.Stream; seq++ {
if mset.node != nil {
mset.node.Propose(encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq-1, 0))
if node != nil {
entries = append(entries, &Entry{EntryNormal, encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq-1, 0)})
// So a single message does not get too big.
if len(entries) > 10_000 {
node.ProposeDirect(entries)
entries = entries[:0]
}
} else {
mset.lseq = mset.store.SkipMsg()
mset.lseq = store.SkipMsg()
}
}
// Send all at once.
if node != nil && len(entries) > 0 {
node.ProposeDirect(entries)
}
}
// Capture consumer name.
@@ -2455,8 +2466,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check to see if we are over the max msg size.
if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize {
mset.mu.Unlock()
mset.clfs++
mset.mu.Unlock()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"}