[FIXED] Service import/export cycles causing stack overflow

There was a way to detect a cycle but I believe it needs to be
a stack of "si" not just the one before invoking processServiceImport.

Changes in #3393 would solve issue reported with test TestAccountImportCycle,
but would not address the new reported issue represented by new test
TestLeafNodeSvcImportExportCycle. This current approach seems to solve
all known cases.

Resolves #3397
Replaces #3393
This commit is contained in:
Ivan Kozlovic
2022-08-26 14:34:53 -06:00
parent 97bba60bb5
commit a8318d1f62
5 changed files with 137 additions and 50 deletions

View File

@@ -2141,15 +2141,8 @@ func (a *Account) processServiceImportResponse(sub *subscription, c *client, _ *
}
a.mu.RUnlock()
old := c.pa.subject
if c.kind == CLIENT || c.kind == LEAF {
// reset state to prior to service invocation (code to reset c.pa.subject to old may not be necessary)
c.pa.subject = []byte(si.to)
}
// Send for normal processing.
c.processServiceImport(si, a, msg)
c.pa.subject = old
}
// Will create the response prefix for fast generation of responses.

View File

@@ -3526,29 +3526,32 @@ func TestAccountUserSubPermsWithQueueGroups(t *testing.T) {
}
func TestAccountImportCycle(t *testing.T) {
cf := createConfFile(t, []byte(`
port: -1
accounts: {
CP: {
users: [
{user: cp, password: cp},
],
exports: [
{service: "q1.>", response_type: Singleton},
{service: "q2.>", response_type: Singleton},
],
},
A: {
users: [
{user: a, password: a},
],
imports: [
{service: {account: CP, subject: "q1.>"}},
{service: {account: CP, subject: "q2.>"}},
]
},
}
`))
tmpl := `
port: -1
accounts: {
CP: {
users: [
{user: cp, password: cp},
],
exports: [
{service: "q1.>", response_type: Singleton},
{service: "q2.>", response_type: Singleton},
%s
],
},
A: {
users: [
{user: a, password: a},
],
imports: [
{service: {account: CP, subject: "q1.>"}},
{service: {account: CP, subject: "q2.>"}},
%s
]
},
}
`
cf := createConfFile(t, []byte(fmt.Sprintf(tmpl, _EMPTY_, _EMPTY_)))
defer removeFile(t, cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
@@ -3558,22 +3561,27 @@ func TestAccountImportCycle(t *testing.T) {
ncA, err := nats.Connect(s.ClientURL(), nats.UserInfo("a", "a"))
require_NoError(t, err)
defer ncA.Close()
// setup reply
subCp, err := ncCp.SubscribeSync("q1.>")
require_NoError(t, err)
// setup requestor and send reply
// setup responder
natsSub(t, ncCp, "q1.>", func(m *nats.Msg) { m.Respond([]byte("reply")) })
// setup requestor
ib := "q2.inbox"
subAResp, err := ncA.SubscribeSync(ib)
require_NoError(t, err)
// send request
err = ncA.PublishRequest("q1.a", ib, []byte("test"))
req := func() {
t.Helper()
// send request
err = ncA.PublishRequest("q1.a", ib, []byte("test"))
require_NoError(t, err)
mRep, err := subAResp.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, string(mRep.Data), "reply")
}
req()
// Update the config and do a config reload and make sure it all still work
changeCurrentConfigContentWithNewContent(t, cf, []byte(
fmt.Sprintf(tmpl, `{service: "q3.>", response_type: Singleton},`, `{service: {account: CP, subject: "q3.>"}},`)))
err = s.Reload()
require_NoError(t, err)
// reply
mReq, err := subCp.NextMsg(time.Second)
require_NoError(t, err)
err = mReq.Respond([]byte("reply"))
require_NoError(t, err)
mRep, err := subAResp.NextMsg(time.Second)
require_NoError(t, err)
require_Contains(t, string(mRep.Data), "reply")
req()
}

View File

@@ -3858,8 +3858,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
// If we are here and we are a serviceImport response make sure we are not matching back
// to the import/export pair that started the request. If so ignore.
if isResponse && c.pa.psi != nil && c.pa.psi.se == si.se {
return
if isResponse && len(c.pa.psi) > 0 {
for i := len(c.pa.psi) - 1; i >= 0; i-- {
if psi := c.pa.psi[i]; psi.se == si.se {
return
}
}
}
acc.mu.RLock()
@@ -3935,11 +3939,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
// Set previous service import to detect chaining.
hadPrevSi, share := c.pa.psi != nil, si.share
lpsi := len(c.pa.psi)
hadPrevSi, share := lpsi > 0, si.share
if hadPrevSi {
share = c.pa.psi.share
share = c.pa.psi[lpsi-1].share
}
c.pa.psi = si
c.pa.psi = append(c.pa.psi, si)
// Place our client info for the request in the original message.
// This will survive going across routes, etc.

View File

@@ -1205,3 +1205,84 @@ default_js_domain: {B:"DHUB"}
require_NoError(t, err)
require_Equal(t, si.Cluster.Name, "HUB")
}
func TestLeafNodeSvcImportExportCycle(t *testing.T) {
accounts := `
accounts {
SYS: {
users: [{user: admin, password: admin}]
}
LEAF_USER: {
users: [{user: leaf_user, password: leaf_user}]
imports: [
{service: {account: LEAF_INGRESS, subject: "foo"}}
{service: {account: LEAF_INGRESS, subject: "_INBOX.>"}}
{service: {account: LEAF_INGRESS, subject: "$JS.leaf.API.>"}, to: "JS.leaf_ingress@leaf.API.>" }
]
jetstream: enabled
}
LEAF_INGRESS: {
users: [{user: leaf_ingress, password: leaf_ingress}]
exports: [
{service: "foo", accounts: [LEAF_USER]}
{service: "_INBOX.>", accounts: [LEAF_USER]}
{service: "$JS.leaf.API.>", response_type: "stream", accounts: [LEAF_USER]}
]
imports: [
]
jetstream: enabled
}
}
system_account: SYS
`
hconf := createConfFile(t, []byte(fmt.Sprintf(`
%s
listen: "127.0.0.1:-1"
leafnodes {
listen: "127.0.0.1:-1"
}
`, accounts)))
defer os.Remove(hconf)
s, o := RunServerWithConfig(hconf)
defer s.Shutdown()
lconf := createConfFile(t, []byte(fmt.Sprintf(`
%s
server_name: leaf-server
jetstream {
store_dir: '%s'
domain=leaf
}
listen: "127.0.0.1:-1"
leafnodes {
remotes = [
{
urls: ["nats-leaf://leaf_ingress:leaf_ingress@127.0.0.1:%v"]
account: "LEAF_INGRESS"
}
]
}
`, accounts, createDir(t, JetStreamStoreDir), o.LeafNode.Port)))
defer os.Remove(lconf)
sl, so := RunServerWithConfig(lconf)
defer sl.Shutdown()
checkLeafNodeConnected(t, sl)
nc := natsConnect(t, fmt.Sprintf("nats://leaf_user:leaf_user@127.0.0.1:%v", so.Port))
defer nc.Close()
js, _ := nc.JetStream(nats.APIPrefix("JS.leaf_ingress@leaf.API."))
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: nats.FileStorage,
})
require_NoError(t, err)
_, err = js.Publish("foo", []byte("msg"))
require_NoError(t, err)
}

View File

@@ -48,7 +48,7 @@ type pubArg struct {
queues [][]byte
size int
hdr int
psi *serviceImport
psi []*serviceImport
}
// Parser constants