[fixed] LeafNode sending message using stream's import subject.

A publish on "a" becomes an LMSG on ">" which
is the stream import's subject. The subscriber on "a" on the other
side did not receive the message.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-12-15 10:11:19 -07:00
committed by Matthias Hanel
parent e81b208591
commit eafc6b7a25
2 changed files with 51 additions and 1 deletions

View File

@@ -2802,7 +2802,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac
if rt.sub.im.tr != nil {
to, _ := rt.sub.im.tr.transformSubject(string(subj))
subj = []byte(to)
} else {
} else if !rt.sub.im.usePub {
subj = []byte(rt.sub.im.to)
}
}

View File

@@ -2825,3 +2825,53 @@ func TestLeafNodeWSGossip(t *testing.T) {
t.Fatal("Leafnode connection is not websocket!")
}
}
func TestLeafNodeStreamImport(t *testing.T) {
o1 := DefaultOptions()
o1.LeafNode.Port = -1
accA := NewAccount("A")
o1.Accounts = []*Account{accA}
o1.Users = []*User{&User{Username: "a", Password: "a", Account: accA}}
o1.LeafNode.Account = "A"
o1.NoAuthUser = "a"
s1 := RunServer(o1)
defer s1.Shutdown()
o2 := DefaultOptions()
o2.LeafNode.Port = -1
accB := NewAccount("B")
if err := accB.AddStreamExport(">", nil); err != nil {
t.Fatalf("Error adding stream export: %v", err)
}
accC := NewAccount("C")
if err := accC.AddStreamImport(accB, ">", ""); err != nil {
t.Fatalf("Error adding stream import: %v", err)
}
o2.Accounts = []*Account{accB, accC}
o2.Users = []*User{&User{Username: "b", Password: "b", Account: accB}, &User{Username: "c", Password: "c", Account: accC}}
o2.NoAuthUser = "b"
u, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port))
if err != nil {
t.Fatalf("Error parsing url: %v", err)
}
o2.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u}, LocalAccount: "C"}}
s2 := RunServer(o2)
defer s2.Shutdown()
nc1 := natsConnect(t, s1.ClientURL())
defer nc1.Close()
sub := natsSubSync(t, nc1, "a")
checkSubInterest(t, s2, "C", "a", time.Second)
nc2 := natsConnect(t, s2.ClientURL())
defer nc2.Close()
natsPub(t, nc2, "a", []byte("hello?"))
natsNexMsg(t, sub, time.Second)
}