Merge pull request #2116 from nats-io/mirror_hbs

Allow control messages like heartbeats to pass the old sub test.
This commit is contained in:
Derek Collison
2021-04-14 14:12:53 -07:00
committed by GitHub
2 changed files with 15 additions and 17 deletions

View File

@@ -1438,10 +1438,6 @@ func TestNoRaceJetStreamClusterStreamCreateAndLostQuorum(t *testing.T) {
}
func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) {
// These pass locally but are flaky on Travis.
// Disable for now.
skip(t)
sc := createJetStreamSuperCluster(t, 3, 3)
defer sc.shutdown()

View File

@@ -1188,19 +1188,21 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
return false
}
isControl := m.isControlMsg()
// Ignore from old subscriptions.
// The reason we can not just compare subs is that on cross account imports they will not match.
if !mset.mirror.isCurrentSub(m.rply) {
if !mset.mirror.isCurrentSub(m.rply) && !isControl {
mset.mu.Unlock()
return false
}
mset.mirror.last = time.Now()
node := mset.node
// Check for heartbeats and flow control messages.
if m.isControlMsg() && mset.mirror.cname != _EMPTY_ {
if isControl {
var needsRetry bool
mset.mirror.last = time.Now()
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
mset.handleFlowControl(mset.mirror, m)
@@ -1217,7 +1219,6 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
return !needsRetry
}
mset.mirror.last = time.Now()
sseq, dseq, dc, ts, pending := replyInfo(m.rply)
if dc > 1 {
@@ -1724,12 +1725,20 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
return false
}
isControl := m.isControlMsg()
// Ignore from old subscriptions.
if !si.isCurrentSub(m.rply) && !isControl {
mset.mu.Unlock()
return false
}
si.last = time.Now()
node := mset.node
// Check for heartbeats and flow control messages.
if m.isControlMsg() && si.cname != _EMPTY_ {
if isControl {
var needsRetry bool
si.last = time.Now()
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
mset.handleFlowControl(si, m)
@@ -1744,13 +1753,6 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
return !needsRetry
}
// Ignore from old subscriptions.
if !si.isCurrentSub(m.rply) {
mset.mu.Unlock()
return false
}
si.last = time.Now()
sseq, dseq, dc, _, pending := replyInfo(m.rply)
if dc > 1 {