From eafc6b7a252dc1e60125b0c6bb9c458bdf7cde65 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 15 Dec 2020 10:11:19 -0700 Subject: [PATCH] [fixed] LeafNode sending message using stream's import subject. A publish on "a" becomes an LMSG on ">" which is the stream import's subject. The subscriber on "a" on the other side did not receive the message. Signed-off-by: Ivan Kozlovic --- server/client.go | 2 +- server/leafnode_test.go | 50 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/server/client.go b/server/client.go index 990796d1..976e8911 100644 --- a/server/client.go +++ b/server/client.go @@ -2802,7 +2802,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac if rt.sub.im.tr != nil { to, _ := rt.sub.im.tr.transformSubject(string(subj)) subj = []byte(to) - } else { + } else if !rt.sub.im.usePub { subj = []byte(rt.sub.im.to) } } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 788a8f2d..0891d3fb 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2825,3 +2825,53 @@ func TestLeafNodeWSGossip(t *testing.T) { t.Fatal("Leafnode connection is not websocket!") } } + +func TestLeafNodeStreamImport(t *testing.T) { + o1 := DefaultOptions() + o1.LeafNode.Port = -1 + accA := NewAccount("A") + o1.Accounts = []*Account{accA} + o1.Users = []*User{&User{Username: "a", Password: "a", Account: accA}} + o1.LeafNode.Account = "A" + o1.NoAuthUser = "a" + s1 := RunServer(o1) + defer s1.Shutdown() + + o2 := DefaultOptions() + o2.LeafNode.Port = -1 + + accB := NewAccount("B") + if err := accB.AddStreamExport(">", nil); err != nil { + t.Fatalf("Error adding stream export: %v", err) + } + + accC := NewAccount("C") + if err := accC.AddStreamImport(accB, ">", ""); err != nil { + t.Fatalf("Error adding stream import: %v", err) + } + + o2.Accounts = []*Account{accB, accC} + o2.Users = []*User{&User{Username: "b", Password: "b", Account: accB}, &User{Username: "c", Password: "c", Account: accC}} + o2.NoAuthUser = "b" + u, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port)) + if err != nil { + t.Fatalf("Error parsing url: %v", err) + } + o2.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u}, LocalAccount: "C"}} + s2 := RunServer(o2) + defer s2.Shutdown() + + nc1 := natsConnect(t, s1.ClientURL()) + defer nc1.Close() + + sub := natsSubSync(t, nc1, "a") + + checkSubInterest(t, s2, "C", "a", time.Second) + + nc2 := natsConnect(t, s2.ClientURL()) + defer nc2.Close() + + natsPub(t, nc2, "a", []byte("hello?")) + + natsNexMsg(t, sub, time.Second) +}