From 467614ea87995b325487ef3c7f54f4e3a4a5323f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 8 Mar 2021 15:42:03 -0600 Subject: [PATCH] Make sure to check for old messages during processsing. Also changed the way we detect old messages. Signed-off-by: Derek Collison --- server/raft.go | 2 +- server/stream.go | 21 ++++++--------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/server/raft.go b/server/raft.go index a367bcb7..a2aa502d 100644 --- a/server/raft.go +++ b/server/raft.go @@ -343,7 +343,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { reqs: make(chan *voteRequest, 8), votes: make(chan *voteResponse, 32), propc: make(chan *Entry, 8192), - entryc: make(chan *appendEntry, 8192), + entryc: make(chan *appendEntry, 32768), respc: make(chan *appendEntryResponse, 32768), applyc: make(chan *CommittedEntry, 8192), leadc: make(chan bool, 8), diff --git a/server/stream.go b/server/stream.go index 3dd5caf9..08770448 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1093,7 +1093,9 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) { mset.mu.Unlock() return } else { - mset.resetMirrorConsumer() + if mset.mirror.cname != _EMPTY_ && mset.mirror.cname == tokenAt(m.rply, 4) { + mset.resetMirrorConsumer() + } } } else { s.Warnf("Got error processing JetStream mirror msg: %v", err) @@ -1174,13 +1176,6 @@ func (mset *stream) setupMirrorConsumer() error { // Process inbound mirror messages from the wire. sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { - mset.mu.RLock() - // Ignore anything not current. - notCurrent := mset.mirror == nil || !bytes.Equal(sub.subject, mset.mirror.sub.subject) - mset.mu.RUnlock() - if notCurrent { - return - } hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. mset.queueInbound(mset.mirror.msgs, subject, reply, hdr, msg) }) @@ -1338,10 +1333,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { } sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { - // Ignore anything not current. - if !bytes.Equal(sub.subject, si.sub.subject) { - return - } hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. mset.queueInbound(si.msgs, subject, reply, hdr, msg) }) @@ -1357,7 +1348,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { req := &CreateConsumerRequest{ Stream: sname, Config: ConsumerConfig{ - DeliverSubject: string(sub.subject), + DeliverSubject: deliverSubject, AckPolicy: AckNone, AckWait: 48 * time.Hour, MaxDeliver: 1, @@ -1519,8 +1510,8 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { si.sseq = sseq } } else { - if dseq > si.dseq { - // FIXME(dlc) - No rapid fire. + // Check to see if we know this is from an old consumer. + if si.cname != _EMPTY_ && dseq > si.dseq && si.cname == tokenAt(m.rply, 4) { mset.setSourceConsumer(si.name, si.sseq+1) } mset.mu.Unlock()