From 627d20f7d496e763bbd96a714cc56d2a208caa80 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 14 Apr 2021 10:43:55 -0700 Subject: [PATCH] Make sure to stop retries for mirror consumers when not leader and through old control messages. Signed-off-by: Derek Collison --- server/stream.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/server/stream.go b/server/stream.go index f015cd0b..abdd7693 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1188,6 +1188,13 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { return false } + // Ignore from old subscriptions. + // The reason we can not just compare subs is that on cross account imports they will not match. + if !mset.mirror.isCurrentSub(m.rply) { + mset.mu.Unlock() + return false + } + node := mset.node // Check for heartbeats and flow control messages. @@ -1210,13 +1217,6 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { return !needsRetry } - // Ignore from old subscriptions. - // The reason we can not just compare subs is that on cross account imports they will not match. - if !mset.mirror.isCurrentSub(m.rply) { - mset.mu.Unlock() - return false - } - mset.mirror.last = time.Now() sseq, dseq, dc, ts, pending := replyInfo(m.rply) @@ -1337,6 +1337,11 @@ func (mset *stream) setupMirrorConsumer() error { } // Make sure to delete any prior consumers if we know about them. mset.removeInternalConsumer(mset.mirror) + + // If we are no longer the leader stop trying. + if !mset.isLeader() { + return nil + } } // Determine subjects etc. @@ -1395,7 +1400,7 @@ func (mset *stream) setupMirrorConsumer() error { respCh := make(chan *JSApiConsumerCreateResponse, 1) reply := infoReplySubject() - mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { + crSub, _ := mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { mset.unsubscribe(sub) _, msg := c.msgParts(rmsg) @@ -1467,7 +1472,8 @@ func (mset *stream) setupMirrorConsumer() error { mset.mu.Unlock() } mset.setMirrorErr(ccr.Error) - case <-time.After(5 * time.Second): + case <-time.After(10 * time.Second): + mset.unsubscribe(crSub) return } }()