Make sure to stop retries for mirror consumers when not leader and through old control messages.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-04-14 10:43:55 -07:00
parent 1127c5d771
commit 627d20f7d4

View File

@@ -1188,6 +1188,13 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
return false
}
// Ignore from old subscriptions.
// The reason we can not just compare subs is that on cross account imports they will not match.
if !mset.mirror.isCurrentSub(m.rply) {
mset.mu.Unlock()
return false
}
node := mset.node
// Check for heartbeats and flow control messages.
@@ -1210,13 +1217,6 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
return !needsRetry
}
// Ignore from old subscriptions.
// The reason we can not just compare subs is that on cross account imports they will not match.
if !mset.mirror.isCurrentSub(m.rply) {
mset.mu.Unlock()
return false
}
mset.mirror.last = time.Now()
sseq, dseq, dc, ts, pending := replyInfo(m.rply)
@@ -1337,6 +1337,11 @@ func (mset *stream) setupMirrorConsumer() error {
}
// Make sure to delete any prior consumers if we know about them.
mset.removeInternalConsumer(mset.mirror)
// If we are no longer the leader stop trying.
if !mset.isLeader() {
return nil
}
}
// Determine subjects etc.
@@ -1395,7 +1400,7 @@ func (mset *stream) setupMirrorConsumer() error {
respCh := make(chan *JSApiConsumerCreateResponse, 1)
reply := infoReplySubject()
mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
crSub, _ := mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
mset.unsubscribe(sub)
_, msg := c.msgParts(rmsg)
@@ -1467,7 +1472,8 @@ func (mset *stream) setupMirrorConsumer() error {
mset.mu.Unlock()
}
mset.setMirrorErr(ccr.Error)
case <-time.After(5 * time.Second):
case <-time.After(10 * time.Second):
mset.unsubscribe(crSub)
return
}
}()