From a8318d1f6231d1eeae37f92ff9ab9d600544a08f Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 26 Aug 2022 14:34:53 -0600 Subject: [PATCH] [FIXED] Service import/export cycles causing stack overflow There was a way to detect a cycle but I believe it needs to be a stack of "si" not just the one before invoking processServiceImport. Changes in #3393 would solve issue reported with test TestAccountImportCycle, but would not address the new reported issue represented by new test TestLeafNodeSvcImportExportCycle. This current approach seems to solve all known cases. Resolves #3397 Replaces #3393 --- server/accounts.go | 7 --- server/accounts_test.go | 82 +++++++++++++++++-------------- server/client.go | 15 ++++-- server/jetstream_leafnode_test.go | 81 ++++++++++++++++++++++++++++++ server/parser.go | 2 +- 5 files changed, 137 insertions(+), 50 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index c8dd2045..8c68d874 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2141,15 +2141,8 @@ func (a *Account) processServiceImportResponse(sub *subscription, c *client, _ * } a.mu.RUnlock() - old := c.pa.subject - if c.kind == CLIENT || c.kind == LEAF { - // reset state to prior to service invocation (code to reset c.pa.subject to old may not be necessary) - c.pa.subject = []byte(si.to) - } - // Send for normal processing. c.processServiceImport(si, a, msg) - c.pa.subject = old } // Will create the response prefix for fast generation of responses. diff --git a/server/accounts_test.go b/server/accounts_test.go index 30479f87..3d715ed9 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -3526,29 +3526,32 @@ func TestAccountUserSubPermsWithQueueGroups(t *testing.T) { } func TestAccountImportCycle(t *testing.T) { - cf := createConfFile(t, []byte(` - port: -1 - accounts: { - CP: { - users: [ - {user: cp, password: cp}, - ], - exports: [ - {service: "q1.>", response_type: Singleton}, - {service: "q2.>", response_type: Singleton}, - ], - }, - A: { - users: [ - {user: a, password: a}, - ], - imports: [ - {service: {account: CP, subject: "q1.>"}}, - {service: {account: CP, subject: "q2.>"}}, - ] - }, - } - `)) + tmpl := ` + port: -1 + accounts: { + CP: { + users: [ + {user: cp, password: cp}, + ], + exports: [ + {service: "q1.>", response_type: Singleton}, + {service: "q2.>", response_type: Singleton}, + %s + ], + }, + A: { + users: [ + {user: a, password: a}, + ], + imports: [ + {service: {account: CP, subject: "q1.>"}}, + {service: {account: CP, subject: "q2.>"}}, + %s + ] + }, + } + ` + cf := createConfFile(t, []byte(fmt.Sprintf(tmpl, _EMPTY_, _EMPTY_))) defer removeFile(t, cf) s, _ := RunServerWithConfig(cf) defer s.Shutdown() @@ -3558,22 +3561,27 @@ func TestAccountImportCycle(t *testing.T) { ncA, err := nats.Connect(s.ClientURL(), nats.UserInfo("a", "a")) require_NoError(t, err) defer ncA.Close() - // setup reply - subCp, err := ncCp.SubscribeSync("q1.>") - require_NoError(t, err) - // setup requestor and send reply + // setup responder + natsSub(t, ncCp, "q1.>", func(m *nats.Msg) { m.Respond([]byte("reply")) }) + // setup requestor ib := "q2.inbox" subAResp, err := ncA.SubscribeSync(ib) require_NoError(t, err) - // send request - err = ncA.PublishRequest("q1.a", ib, []byte("test")) + req := func() { + t.Helper() + // send request + err = ncA.PublishRequest("q1.a", ib, []byte("test")) + require_NoError(t, err) + mRep, err := subAResp.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, string(mRep.Data), "reply") + } + req() + + // Update the config and do a config reload and make sure it all still work + changeCurrentConfigContentWithNewContent(t, cf, []byte( + fmt.Sprintf(tmpl, `{service: "q3.>", response_type: Singleton},`, `{service: {account: CP, subject: "q3.>"}},`))) + err = s.Reload() require_NoError(t, err) - // reply - mReq, err := subCp.NextMsg(time.Second) - require_NoError(t, err) - err = mReq.Respond([]byte("reply")) - require_NoError(t, err) - mRep, err := subAResp.NextMsg(time.Second) - require_NoError(t, err) - require_Contains(t, string(mRep.Data), "reply") + req() } diff --git a/server/client.go b/server/client.go index 601078f6..44177fbd 100644 --- a/server/client.go +++ b/server/client.go @@ -3858,8 +3858,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt } // If we are here and we are a serviceImport response make sure we are not matching back // to the import/export pair that started the request. If so ignore. - if isResponse && c.pa.psi != nil && c.pa.psi.se == si.se { - return + if isResponse && len(c.pa.psi) > 0 { + for i := len(c.pa.psi) - 1; i >= 0; i-- { + if psi := c.pa.psi[i]; psi.se == si.se { + return + } + } } acc.mu.RLock() @@ -3935,11 +3939,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt } // Set previous service import to detect chaining. - hadPrevSi, share := c.pa.psi != nil, si.share + lpsi := len(c.pa.psi) + hadPrevSi, share := lpsi > 0, si.share if hadPrevSi { - share = c.pa.psi.share + share = c.pa.psi[lpsi-1].share } - c.pa.psi = si + c.pa.psi = append(c.pa.psi, si) // Place our client info for the request in the original message. // This will survive going across routes, etc. diff --git a/server/jetstream_leafnode_test.go b/server/jetstream_leafnode_test.go index 5343bbb9..6277c1fc 100644 --- a/server/jetstream_leafnode_test.go +++ b/server/jetstream_leafnode_test.go @@ -1205,3 +1205,84 @@ default_js_domain: {B:"DHUB"} require_NoError(t, err) require_Equal(t, si.Cluster.Name, "HUB") } + +func TestLeafNodeSvcImportExportCycle(t *testing.T) { + accounts := ` + accounts { + SYS: { + users: [{user: admin, password: admin}] + } + LEAF_USER: { + users: [{user: leaf_user, password: leaf_user}] + imports: [ + {service: {account: LEAF_INGRESS, subject: "foo"}} + {service: {account: LEAF_INGRESS, subject: "_INBOX.>"}} + {service: {account: LEAF_INGRESS, subject: "$JS.leaf.API.>"}, to: "JS.leaf_ingress@leaf.API.>" } + ] + jetstream: enabled + } + LEAF_INGRESS: { + users: [{user: leaf_ingress, password: leaf_ingress}] + exports: [ + {service: "foo", accounts: [LEAF_USER]} + {service: "_INBOX.>", accounts: [LEAF_USER]} + {service: "$JS.leaf.API.>", response_type: "stream", accounts: [LEAF_USER]} + ] + imports: [ + ] + jetstream: enabled + } + } + system_account: SYS + ` + + hconf := createConfFile(t, []byte(fmt.Sprintf(` + %s + listen: "127.0.0.1:-1" + leafnodes { + listen: "127.0.0.1:-1" + } + `, accounts))) + defer os.Remove(hconf) + s, o := RunServerWithConfig(hconf) + defer s.Shutdown() + + lconf := createConfFile(t, []byte(fmt.Sprintf(` + %s + server_name: leaf-server + jetstream { + store_dir: '%s' + domain=leaf + } + + listen: "127.0.0.1:-1" + leafnodes { + remotes = [ + { + urls: ["nats-leaf://leaf_ingress:leaf_ingress@127.0.0.1:%v"] + account: "LEAF_INGRESS" + } + ] + } + `, accounts, createDir(t, JetStreamStoreDir), o.LeafNode.Port))) + defer os.Remove(lconf) + sl, so := RunServerWithConfig(lconf) + defer sl.Shutdown() + + checkLeafNodeConnected(t, sl) + + nc := natsConnect(t, fmt.Sprintf("nats://leaf_user:leaf_user@127.0.0.1:%v", so.Port)) + defer nc.Close() + + js, _ := nc.JetStream(nats.APIPrefix("JS.leaf_ingress@leaf.API.")) + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: nats.FileStorage, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", []byte("msg")) + require_NoError(t, err) +} diff --git a/server/parser.go b/server/parser.go index d5c9b495..74f55f57 100644 --- a/server/parser.go +++ b/server/parser.go @@ -48,7 +48,7 @@ type pubArg struct { queues [][]byte size int hdr int - psi *serviceImport + psi []*serviceImport } // Parser constants