mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #2113 from nats-io/mirrors
Make sure to stop unneeded retries for mirror consumers.
This commit is contained in:
@@ -1188,6 +1188,13 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Ignore from old subscriptions.
|
||||
// The reason we can not just compare subs is that on cross account imports they will not match.
|
||||
if !mset.mirror.isCurrentSub(m.rply) {
|
||||
mset.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
node := mset.node
|
||||
|
||||
// Check for heartbeats and flow control messages.
|
||||
@@ -1210,13 +1217,6 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
|
||||
return !needsRetry
|
||||
}
|
||||
|
||||
// Ignore from old subscriptions.
|
||||
// The reason we can not just compare subs is that on cross account imports they will not match.
|
||||
if !mset.mirror.isCurrentSub(m.rply) {
|
||||
mset.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
mset.mirror.last = time.Now()
|
||||
sseq, dseq, dc, ts, pending := replyInfo(m.rply)
|
||||
|
||||
@@ -1337,6 +1337,11 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
}
|
||||
// Make sure to delete any prior consumers if we know about them.
|
||||
mset.removeInternalConsumer(mset.mirror)
|
||||
|
||||
// If we are no longer the leader stop trying.
|
||||
if !mset.isLeader() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Determine subjects etc.
|
||||
@@ -1395,7 +1400,7 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
|
||||
respCh := make(chan *JSApiConsumerCreateResponse, 1)
|
||||
reply := infoReplySubject()
|
||||
mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
||||
crSub, _ := mset.subscribeInternal(reply, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
|
||||
mset.unsubscribe(sub)
|
||||
_, msg := c.msgParts(rmsg)
|
||||
|
||||
@@ -1467,7 +1472,8 @@ func (mset *stream) setupMirrorConsumer() error {
|
||||
mset.mu.Unlock()
|
||||
}
|
||||
mset.setMirrorErr(ccr.Error)
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-time.After(10 * time.Second):
|
||||
mset.unsubscribe(crSub)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
Reference in New Issue
Block a user