Make the Leafnode internal sub on _GR_.>

This is needed for mapped gateway replies. We had used an extra
token when implementing the new prefix, but it was then removed,
but the leafnode subscription on _GR_.*.*.*.> was not updated.
We now subscribe on _GR_.>
There was a test that was passing because we were using inboxes
that caused the pattern to match. Replaced with single token
reply so that it would have caught this bug.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2019-11-17 17:37:09 -07:00
parent 006a2a94fc
commit e0bc81d0ed
2 changed files with 16 additions and 3 deletions

View File

@@ -1002,7 +1002,7 @@ func (s *Server) initLeafNodeSmap(c *client) int {
if c.isSolicitedLeafNode() {
// Add a fake subscription for this solicited leafnode connection
// so that we can send back directly for mapped GW replies.
c.srv.gwLeafSubs.Insert(&subscription{client: c, subject: []byte(gwReplyPrefix + "*.*.*.>")})
c.srv.gwLeafSubs.Insert(&subscription{client: c, subject: []byte(gwReplyPrefix + ">")})
}
// Now walk the results and add them to our smap
@@ -1022,7 +1022,7 @@ func (s *Server) initLeafNodeSmap(c *client) int {
// TODO(dlc) - Should we lock this down more?
if applyGlobalRouting {
c.leaf.smap[oldGWReplyPrefix+"*.>"]++
c.leaf.smap[gwReplyPrefix+"*.*.*.>"]++
c.leaf.smap[gwReplyPrefix+">"]++
}
// Detect loop by subscribing to a specific subject and checking
// if this is coming back to us.

View File

@@ -32,6 +32,7 @@ import (
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
"github.com/nats-io/nuid"
)
func createLeafConn(t tLogger, host string, port int) net.Conn {
@@ -2622,9 +2623,21 @@ func TestLeafNodeAndGatewayGlobalRouting(t *testing.T) {
}
defer nc.Close()
if _, err := nc.Request("foo", []byte("Hello"), 250*time.Millisecond); err != nil {
// We don't use an INBOX here because we had a bug where
// leafnode would subscribe to _GR_.*.*.*.> instead of
// _GR_.>, and inbox masked that because of their number
// of tokens.
reply := nuid.Next()
sub, err := nc.SubscribeSync(reply)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := nc.PublishRequest("foo", reply, []byte("Hello")); err != nil {
t.Fatalf("Failed to get response: %v", err)
}
if _, err := sub.NextMsg(250 * time.Millisecond); err != nil {
t.Fatalf("Did not get reply: %v", err)
}
nc.Close()
}
}