From aff10aa16b0cd4c40dcafe4eb366770fd41d64cd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 14 Apr 2020 09:26:35 -0700 Subject: [PATCH] Fix for #1344 Signed-off-by: Derek Collison --- server/leafnode.go | 11 +++ test/leafnode_test.go | 166 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) diff --git a/server/leafnode.go b/server/leafnode.go index eb55a20c..98f2d376 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1056,6 +1056,10 @@ func (s *Server) initLeafNodeSmap(c *client) int { } else { acc.sl.All(&subs) } + + // Check if we have an existing service import reply. + siReply := acc.siReply + // Since leaf nodes only send on interest, if the bound // account has import services we need to send those over. for isubj := range acc.imports.services { @@ -1117,6 +1121,13 @@ func (s *Server) initLeafNodeSmap(c *client) int { // if this is coming back to us. c.leaf.smap[lds]++ + // Check if we need to add an existing siReply to our map. + // This will be a prefix so add on the wildcard. + if siReply != nil { + wcsub := append(siReply, '>') + c.leaf.smap[string(wcsub)]++ + } + lenMap := len(c.leaf.smap) c.mu.Unlock() return lenMap diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 144bd04f..d72a57e5 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -3315,6 +3315,8 @@ func TestServiceExportWithMultipleAccounts(t *testing.T) { listen: "127.0.0.1:-1" } `)) + defer os.Remove(confA) + srvA, optsA := RunServerWithConfig(confA) defer srvA.Shutdown() @@ -3354,6 +3356,8 @@ func TestServiceExportWithMultipleAccounts(t *testing.T) { ` confB := createConfFile(t, []byte(fmt.Sprintf(bConfigTemplate, optsA.LeafNode.Port))) + defer os.Remove(confB) + srvB, optsB := RunServerWithConfig(confB) defer srvB.Shutdown() @@ -3385,3 +3389,165 @@ func TestServiceExportWithMultipleAccounts(t *testing.T) { t.Fatal("Did not receive the correct message") } } + +// This will test for a bug in service export/import with leafnode restart. +// https://github.com/nats-io/nats-server/issues/1344 +func TestServiceExportWithLeafnodeRestart(t *testing.T) { + confG := createConfFile(t, []byte(` + server_name: G + listen: 127.0.0.1:-1 + leafnodes { + listen: "127.0.0.1:-1" + authorization { account:"EXTERNAL" } + } + + accounts: { + INTERNAL: { + users: [ + {user: good, password: pwd} + ] + exports: [{service: "foo", response: singleton}] + imports: [ + { + service: { + account: EXTERNAL + subject: "evilfoo" + }, to: from_evilfoo + } + ] + }, + EXTERNAL: { + users: [ + {user: evil, password: pwd} + ] + exports: [{service: "evilfoo", response: singleton}] + imports: [ + { + service: { + account: INTERNAL + subject: "foo" + }, to: goodfoo + } + ] + } + } + `)) + defer os.Remove(confG) + + srvG, optsG := RunServerWithConfig(confG) + defer srvG.Shutdown() + + eConfigTemplate := ` + server_name: E + listen: 127.0.0.1:-1 + leafnodes { + listen: "127.0.0.1:-1" + remotes = [ + { + url:"nats://127.0.0.1:%d" + account:"EXTERNAL_GOOD" + } + ] + } + + accounts: { + INTERNAL_EVILINC: { + users: [ + {user: evil, password: pwd} + ] + exports: [{service: "foo", response: singleton}] + imports: [ + { + service: { + account: EXTERNAL_GOOD + subject: "goodfoo" + }, to: from_goodfoo + } + ] + }, + EXTERNAL_GOOD: { + users: [ + {user: good, password: pwd} + ] + exports: [{service: "goodfoo", response: singleton}] + imports: [ + { + service: { + account: INTERNAL_EVILINC + subject: "foo" + }, to: evilfoo + } + ] + }, + } + ` + + confE := createConfFile(t, []byte(fmt.Sprintf(eConfigTemplate, optsG.LeafNode.Port))) + defer os.Remove(confE) + + srvE, optsE := RunServerWithConfig(confE) + defer srvE.Shutdown() + + // connect to confE, and offer a service + nc2, err := nats.Connect(fmt.Sprintf("nats://evil:pwd@%s:%d", optsE.Host, optsE.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + nc2.Subscribe("foo", func(msg *nats.Msg) { + if err := msg.Respond([]byte("world")); err != nil { + t.Fatalf("Error on respond: %v", err) + } + }) + nc2.Flush() + + nc, err := nats.Connect(fmt.Sprintf("nats://good:pwd@%s:%d", optsG.Host, optsG.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + resp, err := nc.Request("from_evilfoo", []byte("hello"), 2*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if resp == nil || strings.Compare("world", string(resp.Data)) != 0 { + t.Fatal("Did not receive the correct message") + } + + // Now restart server E and requestor and replier. + srvE.Shutdown() + nc.Close() + nc2.Close() + + srvE, optsE = RunServerWithConfig(confE) + defer srvE.Shutdown() + + nc2, err = nats.Connect(fmt.Sprintf("nats://evil:pwd@%s:%d", optsE.Host, optsE.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + nc2.Subscribe("foo", func(msg *nats.Msg) { + if err := msg.Respond([]byte("world")); err != nil { + t.Fatalf("Error on respond: %v", err) + } + }) + nc2.Flush() + + nc, err = nats.Connect(fmt.Sprintf("nats://good:pwd@%s:%d", optsG.Host, optsG.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + resp, err = nc.Request("from_evilfoo", []byte("hello"), 2*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if resp == nil || strings.Compare("world", string(resp.Data)) != 0 { + t.Fatal("Did not receive the correct message") + } +}