diff --git a/server/stream.go b/server/stream.go index 36ce823e..e1752350 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1445,13 +1445,24 @@ func (mset *stream) setupMirrorConsumer() error { mset.store.FastState(&state) if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream { + node, store := mset.node, mset.store + var entries []*Entry for seq := state.LastSeq + 1; seq <= ccr.ConsumerInfo.Delivered.Stream; seq++ { - if mset.node != nil { - mset.node.Propose(encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq-1, 0)) + if node != nil { + entries = append(entries, &Entry{EntryNormal, encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq-1, 0)}) + // So a single message does not get too big. + if len(entries) > 10_000 { + node.ProposeDirect(entries) + entries = entries[:0] + } } else { - mset.lseq = mset.store.SkipMsg() + mset.lseq = store.SkipMsg() } } + // Send all at once. + if node != nil && len(entries) > 0 { + node.ProposeDirect(entries) + } } // Capture consumer name. @@ -2455,8 +2466,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Check to see if we are over the max msg size. if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize { - mset.mu.Unlock() mset.clfs++ + mset.mu.Unlock() if canRespond { resp.PubAck = &PubAck{Stream: name} resp.Error = &ApiError{Code: 400, Description: "message size exceeds maximum allowed"}