Merge pull request #3088 from nats-io/tr-fix

Fix for subject transforms and JetStream delivery subjects.
This commit is contained in:
Derek Collison
2022-04-28 17:08:00 -07:00
committed by GitHub
2 changed files with 32 additions and 8 deletions

View File

@@ -2654,7 +2654,7 @@ func (c *client) addShadowSub(sub *subscription, ime *ime) (*subscription, error
nsub := *sub // copy
nsub.im = im
if !im.usePub && ime.dyn {
if !im.usePub && ime.dyn && im.tr != nil {
if im.rtr == nil {
im.rtr = im.tr.reverse()
}
@@ -4159,7 +4159,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
if sub.im.tr != nil {
to, _ := sub.im.tr.transformSubject(string(dsubj))
to, _ := sub.im.tr.transformSubject(string(subject))
dsubj = append(_dsubj[:0], to...)
} else if sub.im.usePub {
dsubj = append(_dsubj[:0], subj...)
@@ -4300,7 +4300,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
if sub.im.tr != nil {
to, _ := sub.im.tr.transformSubject(string(subj))
to, _ := sub.im.tr.transformSubject(string(subject))
dsubj = append(_dsubj[:0], to...)
} else if sub.im.usePub {
dsubj = append(_dsubj[:0], subj...)

View File

@@ -17006,7 +17006,8 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) {
users: [ {user: js, password: pwd} ]
exports [
# This is streaming to a delivery subject for a push based consumer.
{ stream: "deliver.ORDERS" }
{ stream: "deliver.*" }
{ stream: "foo.*" }
# This is to ack received messages. This is a service to support sync ack.
{ service: "$JS.ACK.ORDERS.*.>" }
# To support ordered consumers, flow control.
@@ -17016,8 +17017,9 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) {
IM: {
users: [ {user: im, password: pwd} ]
imports [
{ stream: { account: JS, subject: "deliver.ORDERS" }, to: "d" }
{ service: {account: JS, subject: "$JS.FC.>" }}
{ stream: { account: JS, subject: "deliver.ORDERS" }, to: "d.*" }
{ stream: { account: JS, subject: "foo.*" }, to: "bar.*" }
{ service: { account: JS, subject: "$JS.FC.>" }}
]
},
}
@@ -17061,10 +17063,10 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) {
var sub *nats.Subscription
if queue {
sub, err = nc2.QueueSubscribeSync("d", queueName)
sub, err = nc2.QueueSubscribeSync("d.ORDERS", queueName)
require_NoError(t, err)
} else {
sub, err = nc2.SubscribeSync("d")
sub, err = nc2.SubscribeSync("d.ORDERS")
require_NoError(t, err)
}
@@ -17074,6 +17076,28 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) {
if m.Subject != "foo" {
t.Fatalf("Subject not mapped correctly across account boundary, expected %q got %q", "foo", m.Subject)
}
// Now do one that would kick in a transform.
_, err = js.AddConsumer("ORDERS", &nats.ConsumerConfig{
DeliverSubject: "foo.ORDERS",
AckPolicy: nats.AckExplicitPolicy,
DeliverGroup: queueName,
})
require_NoError(t, err)
if queue {
sub, err = nc2.QueueSubscribeSync("bar.ORDERS", queueName)
require_NoError(t, err)
} else {
sub, err = nc2.SubscribeSync("bar.ORDERS")
require_NoError(t, err)
}
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
if m.Subject != "foo" {
t.Fatalf("Subject not mapped correctly across account boundary, expected %q got %q", "foo", m.Subject)
}
}
t.Run("noqueue", func(t *testing.T) {