From 395728bab93b0e64d8a0655496fa04a982a7bfd1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 14 Apr 2021 14:06:18 -0700 Subject: [PATCH] Allow control messages like heartbeats to pass the old sub test. Signed-off-by: Derek Collison --- server/norace_test.go | 4 ---- server/stream.go | 28 +++++++++++++++------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/server/norace_test.go b/server/norace_test.go index a2d43724..fba30873 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -1438,10 +1438,6 @@ func TestNoRaceJetStreamClusterStreamCreateAndLostQuorum(t *testing.T) { } func TestNoRaceJetStreamClusterSuperClusterMirrors(t *testing.T) { - // These pass locally but are flaky on Travis. - // Disable for now. - skip(t) - sc := createJetStreamSuperCluster(t, 3, 3) defer sc.shutdown() diff --git a/server/stream.go b/server/stream.go index abdd7693..00d6a2f7 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1188,19 +1188,21 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { return false } + isControl := m.isControlMsg() + // Ignore from old subscriptions. // The reason we can not just compare subs is that on cross account imports they will not match. - if !mset.mirror.isCurrentSub(m.rply) { + if !mset.mirror.isCurrentSub(m.rply) && !isControl { mset.mu.Unlock() return false } + mset.mirror.last = time.Now() node := mset.node // Check for heartbeats and flow control messages. - if m.isControlMsg() && mset.mirror.cname != _EMPTY_ { + if isControl { var needsRetry bool - mset.mirror.last = time.Now() // Flow controls have reply subjects. if m.rply != _EMPTY_ { mset.handleFlowControl(mset.mirror, m) @@ -1217,7 +1219,6 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { return !needsRetry } - mset.mirror.last = time.Now() sseq, dseq, dc, ts, pending := replyInfo(m.rply) if dc > 1 { @@ -1724,12 +1725,20 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { return false } + isControl := m.isControlMsg() + + // Ignore from old subscriptions. + if !si.isCurrentSub(m.rply) && !isControl { + mset.mu.Unlock() + return false + } + + si.last = time.Now() node := mset.node // Check for heartbeats and flow control messages. - if m.isControlMsg() && si.cname != _EMPTY_ { + if isControl { var needsRetry bool - si.last = time.Now() // Flow controls have reply subjects. if m.rply != _EMPTY_ { mset.handleFlowControl(si, m) @@ -1744,13 +1753,6 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { return !needsRetry } - // Ignore from old subscriptions. - if !si.isCurrentSub(m.rply) { - mset.mu.Unlock() - return false - } - - si.last = time.Now() sseq, dseq, dc, _, pending := replyInfo(m.rply) if dc > 1 {