From 4c6fd179d697c56f95b5aeff54e7ed5c0800df72 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 20 Mar 2021 10:30:09 -0700 Subject: [PATCH] Can not use sub comparisons for old messages from direct consumers. Signed-off-by: Derek Collison --- server/const.go | 2 +- server/jetstream_cluster_test.go | 2 +- server/stream.go | 68 ++++++++++++++++++-------------- 3 files changed, 40 insertions(+), 32 deletions(-) diff --git a/server/const.go b/server/const.go index 9c70e09a..3eec8e59 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.1.RC1" + VERSION = "2.2.1.RC2" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index baf40f2c..e2615577 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -4541,7 +4541,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 20*time.Second, 500*time.Millisecond, func() error { si, err := js2.StreamInfo("MY_MIRROR_TEST") if err != nil { t.Fatalf("Could not retrieve stream info") diff --git a/server/stream.go b/server/stream.go index 789dd70a..e1cb2eb6 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1058,6 +1058,12 @@ func (mset *stream) processMirrorMsgs() { } } +// Checks that the message is from our current direct consumer. We can not depend on sub comparison +// since cross account imports break. +func (si *sourceInfo) isCurrentSub(reply string) bool { + return !(si.cname != _EMPTY_ && strings.HasPrefix(reply, jsAckPre) && si.cname != tokenAt(reply, 4)) +} + // processInboundMirrorMsg handles processing messages bound for a stream. func (mset *stream) processInboundMirrorMsg(m *inMsg) { mset.mu.Lock() @@ -1075,25 +1081,17 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) { node := mset.node // Ignore from old subscriptions. - if mset.mirror.sub != m.sub { + // The reason we can not just compare subs is that on cross account imports they will not match. + if !mset.mirror.isCurrentSub(m.rply) { mset.mu.Unlock() return } // Check for heartbeats and flow control messages. - if len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 ")) { + if m.isControlMsg() { // Flow controls have reply subjects. if m.rply != _EMPTY_ { - // If we are clustered we want to delay signaling back the the upstream consumer. - if node != nil { - index, _, _ := node.Progress() - if mset.fcr == nil { - mset.fcr = make(map[uint64]string) - } - mset.fcr[index] = m.rply - } else { - mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) - } + mset.handleFlowControl(m) } mset.mu.Unlock() return @@ -1205,7 +1203,7 @@ func (mset *stream) setupMirrorConsumer() error { // Process inbound mirror messages from the wire. sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. - mset.queueInbound(mset.mirror.msgs, sub, subject, reply, hdr, msg) + mset.queueInbound(mset.mirror.msgs, subject, reply, hdr, msg) }) if err != nil { mset.mirror = nil @@ -1377,7 +1375,7 @@ func (mset *stream) setSourceConsumer(sname string, seq uint64) { sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, subject, reply string, rmsg []byte) { hdr, msg := c.msgParts(append(rmsg[:0:0], rmsg...)) // Need to copy. - mset.queueInbound(si.msgs, sub, subject, reply, hdr, msg) + mset.queueInbound(si.msgs, subject, reply, hdr, msg) }) if err != nil { si.err = jsError(err) @@ -1511,6 +1509,26 @@ func (mset *stream) processSourceMsgs(si *sourceInfo) { } } +// isControlMsg determines if this is a control message. +func (m *inMsg) isControlMsg() bool { + return len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 ")) +} + +// handleFlowControl will properly handle flow control messages for both R1 and R>1. +// Lock should be held. +func (mset *stream) handleFlowControl(m *inMsg) { + // If we are clustered we want to delay signaling back the the upstream consumer. + if node := mset.node; node != nil { + index, _, _ := node.Progress() + if mset.fcr == nil { + mset.fcr = make(map[uint64]string) + } + mset.fcr[index] = m.rply + } else { + mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) + } +} + // processInboundSourceMsg handles processing other stream messages bound for this stream. func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { mset.mu.Lock() @@ -1525,25 +1543,16 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) { node := mset.node // Ignore from old subscriptions. - if si.sub != m.sub { + if !si.isCurrentSub(m.rply) { mset.mu.Unlock() return } // Check for heartbeats and flow control messages. - if len(m.msg) == 0 && len(m.hdr) > 0 && bytes.HasPrefix(m.hdr, []byte("NATS/1.0 100 ")) { + if m.isControlMsg() { // Flow controls have reply subjects. if m.rply != _EMPTY_ { - // If we are clustered we want to delay signaling back the the upstream consumer. - if node != nil { - index, _, _ := node.Progress() - if mset.fcr == nil { - mset.fcr = make(map[uint64]string) - } - mset.fcr[index] = m.rply - } else { - mset.outq.send(&jsPubMsg{m.rply, _EMPTY_, _EMPTY_, nil, nil, nil, 0, nil}) - } + mset.handleFlowControl(m) } mset.mu.Unlock() return @@ -2025,7 +2034,6 @@ type inMsg struct { rply string hdr []byte msg []byte - sub *subscription next *inMsg } @@ -2044,8 +2052,8 @@ func (mset *stream) pending(msgs *inbound) *inMsg { return head } -func (mset *stream) queueInbound(ib *inbound, sub *subscription, subj, rply string, hdr, msg []byte) { - m := &inMsg{subj, rply, hdr, msg, sub, nil} +func (mset *stream) queueInbound(ib *inbound, subj, rply string, hdr, msg []byte) { + m := &inMsg{subj, rply, hdr, msg, nil} mset.mu.Lock() var notify bool @@ -2075,7 +2083,7 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) { if len(msg) > 0 { msg = append(msg[:0:0], msg...) } - mset.queueInbound(mset.msgs, nil, subj, rply, hdr, msg) + mset.queueInbound(mset.msgs, subj, rply, hdr, msg) } // processInboundJetStreamMsg handles processing messages bound for a stream.