diff --git a/server/const.go b/server/const.go index afbd00da..58045f33 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.1.RC3" + VERSION = "2.2.1.RC4" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/stream.go b/server/stream.go index 82835d15..bf40a51e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1084,6 +1084,7 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { // Check for heartbeats and flow control messages. if m.isControlMsg() && mset.mirror.cname != _EMPTY_ { + var needsRetry bool mset.mirror.last = time.Now() // Flow controls have reply subjects. if m.rply != _EMPTY_ { @@ -1091,11 +1092,14 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { } else { // For idle heartbeats make sure we did not miss anything. if ldseq := parseInt64(getHeader(JSLastDeliveredSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != mset.mirror.dseq { - mset.retryMirrorConsumer() + needsRetry = true } } mset.mu.Unlock() - return false + if needsRetry { + mset.retryMirrorConsumer() + } + return !needsRetry } // Ignore from old subscriptions. @@ -1196,6 +1200,7 @@ func (mset *stream) setupMirrorConsumer() error { if mset.mirror.sub != nil { mset.unsubscribe(mset.mirror.sub) mset.mirror.sub = nil + mset.mirror.dseq = 1 } // Make sure to delete any prior consumers if we know about them. mset.removeInternalConsumer(mset.mirror) @@ -1299,6 +1304,7 @@ func (mset *stream) setupMirrorConsumer() error { if err != nil { mset.mirror.err = jsError(err) mset.mirror.sub = nil + mset.mirror.cname = _EMPTY_ } else { mset.mirror.err = nil mset.mirror.sub = sub @@ -1562,6 +1568,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { // Check for heartbeats and flow control messages. if m.isControlMsg() && si.cname != _EMPTY_ { + var needsRetry bool si.last = time.Now() // Flow controls have reply subjects. if m.rply != _EMPTY_ { @@ -1569,11 +1576,12 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { } else { // For idle heartbeats make sure we did not miss anything. if ldseq := parseInt64(getHeader(JSLastDeliveredSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != si.dseq { + needsRetry = true mset.retrySourceConsumerAtSeq(si.name, si.sseq+1) } } mset.mu.Unlock() - return false + return !needsRetry } // Ignore from old subscriptions.