diff --git a/server/client.go b/server/client.go index e9807704..ad834941 100644 --- a/server/client.go +++ b/server/client.go @@ -2654,7 +2654,7 @@ func (c *client) addShadowSub(sub *subscription, ime *ime) (*subscription, error nsub := *sub // copy nsub.im = im - if !im.usePub && ime.dyn { + if !im.usePub && ime.dyn && im.tr != nil { if im.rtr == nil { im.rtr = im.tr.reverse() } @@ -4159,7 +4159,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, continue } if sub.im.tr != nil { - to, _ := sub.im.tr.transformSubject(string(dsubj)) + to, _ := sub.im.tr.transformSubject(string(subject)) dsubj = append(_dsubj[:0], to...) } else if sub.im.usePub { dsubj = append(_dsubj[:0], subj...) @@ -4300,7 +4300,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, continue } if sub.im.tr != nil { - to, _ := sub.im.tr.transformSubject(string(subj)) + to, _ := sub.im.tr.transformSubject(string(subject)) dsubj = append(_dsubj[:0], to...) } else if sub.im.usePub { dsubj = append(_dsubj[:0], subj...) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 19f56a96..7cf7ba66 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -17006,7 +17006,8 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) { users: [ {user: js, password: pwd} ] exports [ # This is streaming to a delivery subject for a push based consumer. - { stream: "deliver.ORDERS" } + { stream: "deliver.*" } + { stream: "foo.*" } # This is to ack received messages. This is a service to support sync ack. { service: "$JS.ACK.ORDERS.*.>" } # To support ordered consumers, flow control. @@ -17016,8 +17017,9 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) { IM: { users: [ {user: im, password: pwd} ] imports [ - { stream: { account: JS, subject: "deliver.ORDERS" }, to: "d" } - { service: {account: JS, subject: "$JS.FC.>" }} + { stream: { account: JS, subject: "deliver.ORDERS" }, to: "d.*" } + { stream: { account: JS, subject: "foo.*" }, to: "bar.*" } + { service: { account: JS, subject: "$JS.FC.>" }} ] }, } @@ -17061,10 +17063,10 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) { var sub *nats.Subscription if queue { - sub, err = nc2.QueueSubscribeSync("d", queueName) + sub, err = nc2.QueueSubscribeSync("d.ORDERS", queueName) require_NoError(t, err) } else { - sub, err = nc2.SubscribeSync("d") + sub, err = nc2.SubscribeSync("d.ORDERS") require_NoError(t, err) } @@ -17074,6 +17076,28 @@ func TestJetStreamImportConsumerStreamSubjectRemapSingle(t *testing.T) { if m.Subject != "foo" { t.Fatalf("Subject not mapped correctly across account boundary, expected %q got %q", "foo", m.Subject) } + + // Now do one that would kick in a transform. + _, err = js.AddConsumer("ORDERS", &nats.ConsumerConfig{ + DeliverSubject: "foo.ORDERS", + AckPolicy: nats.AckExplicitPolicy, + DeliverGroup: queueName, + }) + require_NoError(t, err) + + if queue { + sub, err = nc2.QueueSubscribeSync("bar.ORDERS", queueName) + require_NoError(t, err) + } else { + sub, err = nc2.SubscribeSync("bar.ORDERS") + require_NoError(t, err) + } + m, err = sub.NextMsg(time.Second) + require_NoError(t, err) + + if m.Subject != "foo" { + t.Fatalf("Subject not mapped correctly across account boundary, expected %q got %q", "foo", m.Subject) + } } t.Run("noqueue", func(t *testing.T) {