Can not use sub comparisons for old messages from direct consumers.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-03-20 10:30:09 -07:00
parent ba8d0ef409
commit 4c6fd179d6
3 changed files with 40 additions and 32 deletions

View File

@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.1.RC1"
VERSION = "2.2.1.RC2"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -4541,7 +4541,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
checkFor(t, 20*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 500*time.Millisecond, func() error {
si, err := js2.StreamInfo("MY_MIRROR_TEST")
if err != nil {
t.Fatalf("Could not retrieve stream info")

View File

@@ -1058,6 +1058,12 @@ func (mset *stream) processMirrorMsgs() {
}
}
// Checks that the message is from our current direct consumer. We can not depend on sub comparison
// since cross account imports break.
func (si *sourceInfo) isCurrentSub(reply string) bool {
return !(si.cname != _EMPTY_ && strings.HasPrefix(reply, jsAckPre) && si.cname != tokenAt(reply, 4))
}
// processInboundMirrorMsg handles processing messages bound for a stream.
func (mset *stream) processInboundMirrorMsg(m *inMsg) {
mset.mu.Lock()
@@ -1075,25 +1081,17 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) {
node := mset.node
// Ignore from old subscriptions.
if mset.mirror.sub != m.sub {
// The reason we can not just compare subs is that on cross account imports they will not match.
if !mset.mirror.isCurrentSub(m.rply) {
mset.mu.Unlock()
return
}
// Check for heartbeats and flow control messages.
if len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 ")) {
if m.isControlMsg() {
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
// If we are clustered we want to delay signaling back the the upstream consumer.
if node != nil {
index, _, _ := node.Progress()
if mset.fcr == nil {
mset.fcr = make(map[uint64]string)
}
mset.fcr[index] = m.rply
} else {
mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
}
mset.handleFlowControl(m)
}
mset.mu.Unlock()
return
@@ -1205,7 +1203,7 @@ func (mset *stream) setupMirrorConsumer() error {
// Process inbound mirror messages from the wire.
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
mset.queueInbound(mset.mirror.msgs, sub, subject, reply, hdr, msg)
mset.queueInbound(mset.mirror.msgs, subject, reply, hdr, msg)
})
if err != nil {
mset.mirror = nil
@@ -1377,7 +1375,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) {
sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) {
hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy.
mset.queueInbound(si.msgs, sub, subject, reply, hdr, msg)
mset.queueInbound(si.msgs, subject, reply, hdr, msg)
})
if err != nil {
si.err = jsError(err)
@@ -1511,6 +1509,26 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) {
}
}
// isControlMsg determines if this is a control message.
func (m *inMsg) isControlMsg() bool {
return len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 "))
}
// handleFlowControl will properly handle flow control messages for both R1 and R>1.
// Lock should be held.
func (mset *stream) handleFlowControl(m *inMsg) {
// If we are clustered we want to delay signaling back the the upstream consumer.
if node := mset.node; node != nil {
index, _, _ := node.Progress()
if mset.fcr == nil {
mset.fcr = make(map[uint64]string)
}
mset.fcr[index] = m.rply
} else {
mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
}
}
// processInboundSourceMsg handles processing other stream messages bound for this stream.
func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
mset.mu.Lock()
@@ -1525,25 +1543,16 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) {
node := mset.node
// Ignore from old subscriptions.
if si.sub != m.sub {
if !si.isCurrentSub(m.rply) {
mset.mu.Unlock()
return
}
// Check for heartbeats and flow control messages.
if len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 ")) {
if m.isControlMsg() {
// Flow controls have reply subjects.
if m.rply != _EMPTY_ {
// If we are clustered we want to delay signaling back the the upstream consumer.
if node != nil {
index, _, _ := node.Progress()
if mset.fcr == nil {
mset.fcr = make(map[uint64]string)
}
mset.fcr[index] = m.rply
} else {
mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil})
}
mset.handleFlowControl(m)
}
mset.mu.Unlock()
return
@@ -2025,7 +2034,6 @@ type inMsg struct {
rply string
hdr []byte
msg []byte
sub *subscription
next *inMsg
}
@@ -2044,8 +2052,8 @@ func (mset *stream) pending(msgs *inbound) *inMsg {
return head
}
func (mset *stream) queueInbound(ib *inbound, sub *subscription, subj, rply string, hdr, msg []byte) {
m := &inMsg{subj, rply, hdr, msg, sub, nil}
func (mset *stream) queueInbound(ib *inbound, subj, rply string, hdr, msg []byte) {
m := &inMsg{subj, rply, hdr, msg, nil}
mset.mu.Lock()
var notify bool
@@ -2075,7 +2083,7 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
if len(msg) > 0 {
msg = append(msg[:0:0], msg...)
}
mset.queueInbound(mset.msgs, nil, subj, rply, hdr, msg)
mset.queueInbound(mset.msgs, subj, rply, hdr, msg)
}
// processInboundJetStreamMsg handles processing messages bound for a stream.