Make sure to not interrupt flow control unless we miss.

Fix deadlock for mirrors.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-20 15:05:41 -07:00
parent 25d99931c2
commit 2d46c112d6
2 changed files with 12 additions and 4 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.1.RC3"
VERSION = "2.2.1.RC4"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -1084,6 +1084,7 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
// Check for heartbeats and flow control messages.
if m.isControlMsg() && mset.mirror.cname != _EMPTY_ {
var needsRetry bool
mset.mirror.last = time.Now()
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
@@ -1091,11 +1092,14 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
} else {
// For idle heartbeats make sure we did not miss anything.
if ldseq := parseInt64(getHeader(JSLastDeliveredSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != mset.mirror.dseq {
mset.retryMirrorConsumer()
needsRetry = true
}
}
mset.mu.Unlock()
return false
if needsRetry {
mset.retryMirrorConsumer()
}
return !needsRetry
}
// Ignore from old subscriptions.
@@ -1196,6 +1200,7 @@ func (mset *stream) setupMirrorConsumer() error {
if mset.mirror.sub != nil {
mset.unsubscribe(mset.mirror.sub)
mset.mirror.sub = nil
mset.mirror.dseq = 1
}
// Make sure to delete any prior consumers if we know about them.
mset.removeInternalConsumer(mset.mirror)
@@ -1299,6 +1304,7 @@ func (mset *stream) setupMirrorConsumer() error {
if err != nil {
mset.mirror.err = jsError(err)
mset.mirror.sub = nil
mset.mirror.cname = _EMPTY_
} else {
mset.mirror.err = nil
mset.mirror.sub = sub
@@ -1562,6 +1568,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
// Check for heartbeats and flow control messages.
if m.isControlMsg() && si.cname != _EMPTY_ {
var needsRetry bool
si.last = time.Now()
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
@@ -1569,11 +1576,12 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
} else {
// For idle heartbeats make sure we did not miss anything.
if ldseq := parseInt64(getHeader(JSLastDeliveredSeq, m.hdr)); ldseq > 0 && uint64(ldseq) != si.dseq {
needsRetry = true
mset.retrySourceConsumerAtSeq(si.name, si.sseq+1)
}
}
mset.mu.Unlock()
return false
return !needsRetry
}
// Ignore from old subscriptions.