mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Make sure to check for old messages during processsing.
Also changed the way we detect old messages. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -343,7 +343,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
|
||||
reqs: make(chan *voteRequest, 8),
|
||||
votes: make(chan *voteResponse, 32),
|
||||
propc: make(chan *Entry, 8192),
|
||||
entryc: make(chan *appendEntry, 8192),
|
||||
entryc: make(chan *appendEntry, 32768),
|
||||
respc: make(chan *appendEntryResponse, 32768),
|
||||
applyc: make(chan *CommittedEntry, 8192),
|
||||
leadc: make(chan bool, 8),
|
||||
|
||||
@@ -1093,7 +1093,9 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) {
|
||||
mset.mu.Unlock()
|
||||
return
|
||||
} else {
|
||||
mset.resetMirrorConsumer()
|
||||
if mset.mirror.cname != _EMPTY_ && mset.mirror.cname == tokenAt(m.rply, 4) {
|
||||
mset.resetMirrorConsumer()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
s.Warnf("Got error processing JetStream mirror msg: %v", err)
|
||||
@@ -1174,13 +1176,6 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
|
||||
// Process inbound mirror messages from the wire.
|
||||
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
||||
mset.mu.RLock()
|
||||
// Ignore anything not current.
|
||||
notCurrent := mset.mirror == nil || !bytes.Equal(sub.subject, mset.mirror.sub.subject)
|
||||
mset.mu.RUnlock()
|
||||
if notCurrent {
|
||||
return
|
||||
}
|
||||
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
|
||||
mset.queueInbound(mset.mirror.msgs, subject, reply, hdr, msg)
|
||||
})
|
||||
@@ -1338,10 +1333,6 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
|
||||
}
|
||||
|
||||
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
||||
// Ignore anything not current.
|
||||
if !bytes.Equal(sub.subject, si.sub.subject) {
|
||||
return
|
||||
}
|
||||
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
|
||||
mset.queueInbound(si.msgs, subject, reply, hdr, msg)
|
||||
})
|
||||
@@ -1357,7 +1348,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
|
||||
req := &CreateConsumerRequest{
|
||||
Stream: sname,
|
||||
Config: ConsumerConfig{
|
||||
DeliverSubject: string(sub.subject),
|
||||
DeliverSubject: deliverSubject,
|
||||
AckPolicy: AckNone,
|
||||
AckWait: 48 * time.Hour,
|
||||
MaxDeliver: 1,
|
||||
@@ -1519,8 +1510,8 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
|
||||
si.sseq = sseq
|
||||
}
|
||||
} else {
|
||||
if dseq > si.dseq {
|
||||
// FIXME(dlc) - No rapid fire.
|
||||
// Check to see if we know this is from an old consumer.
|
||||
if si.cname != _EMPTY_ && dseq > si.dseq && si.cname == tokenAt(m.rply, 4) {
|
||||
mset.setSourceConsumer(si.name, si.sseq+1)
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user