Merge pull request #3407 from nats-io/svc_imp_exp_cycle

[FIXED] Service import/export cycles causing stack overflow
This commit is contained in:
Ivan Kozlovic
2022-08-26 16:31:04 -06:00
committed by GitHub
5 changed files with 137 additions and 50 deletions

View File

@@ -2141,15 +2141,8 @@ func (a *Account) processServiceImportResponse(sub *subscription, c *client, _ *
}
a.mu.RUnlock()
old := c.pa.subject
if c.kind == CLIENT || c.kind == LEAF {
// reset state to prior to service invocation (code to reset c.pa.subject to old may not be necessary)
c.pa.subject = []byte(si.to)
}
// Send for normal processing.
c.processServiceImport(si, a, msg)
c.pa.subject = old
}
// Will create the response prefix for fast generation of responses.

View File

@@ -3526,29 +3526,32 @@ func TestAccountUserSubPermsWithQueueGroups(t *testing.T) {
}
func TestAccountImportCycle(t *testing.T) {
cf := createConfFile(t, []byte(`
port: -1
accounts: {
CP: {
users: [
{user: cp, password: cp},
],
exports: [
{service: "q1.>", response_type: Singleton},
{service: "q2.>", response_type: Singleton},
],
},
A: {
users: [
{user: a, password: a},
],
imports: [
{service: {account: CP, subject: "q1.>"}},
{service: {account: CP, subject: "q2.>"}},
]
},
}
`))
tmpl := `
port: -1
accounts: {
CP: {
users: [
{user: cp, password: cp},
],
exports: [
{service: "q1.>", response_type: Singleton},
{service: "q2.>", response_type: Singleton},
%s
],
},
A: {
users: [
{user: a, password: a},
],
imports: [
{service: {account: CP, subject: "q1.>"}},
{service: {account: CP, subject: "q2.>"}},
%s
]
},
}
`
cf := createConfFile(t, []byte(fmt.Sprintf(tmpl, _EMPTY_, _EMPTY_)))
defer removeFile(t, cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
@@ -3558,22 +3561,27 @@ func TestAccountImportCycle(t *testing.T) {
ncA, err := nats.Connect(s.ClientURL(), nats.UserInfo("a", "a"))
require_NoError(t, err)
defer ncA.Close()
// setup reply
subCp, err := ncCp.SubscribeSync("q1.>")
require_NoError(t, err)
// setup requestor and send reply
// setup responder
natsSub(t, ncCp, "q1.>", func(m *nats.Msg) { m.Respond([]byte("reply")) })
// setup requestor
ib := "q2.inbox"
subAResp, err := ncA.SubscribeSync(ib)
require_NoError(t, err)
// send request
err = ncA.PublishRequest("q1.a", ib, []byte("test"))
req := func() {
t.Helper()
// send request
err = ncA.PublishRequest("q1.a", ib, []byte("test"))
require_NoError(t, err)
mRep, err := subAResp.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, string(mRep.Data), "reply")
}
req()
// Update the config and do a config reload and make sure it all still work
changeCurrentConfigContentWithNewContent(t, cf, []byte(
fmt.Sprintf(tmpl, `{service: "q3.>", response_type: Singleton},`, `{service: {account: CP, subject: "q3.>"}},`)))
err = s.Reload()
require_NoError(t, err)
// reply
mReq, err := subCp.NextMsg(time.Second)
require_NoError(t, err)
err = mReq.Respond([]byte("reply"))
require_NoError(t, err)
mRep, err := subAResp.NextMsg(time.Second)
require_NoError(t, err)
require_Contains(t, string(mRep.Data), "reply")
req()
}

View File

@@ -3858,8 +3858,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
// If we are here and we are a serviceImport response make sure we are not matching back
// to the import/export pair that started the request. If so ignore.
if isResponse && c.pa.psi != nil && c.pa.psi.se == si.se {
return
if isResponse && len(c.pa.psi) > 0 {
for i := len(c.pa.psi) - 1; i >= 0; i-- {
if psi := c.pa.psi[i]; psi.se == si.se {
return
}
}
}
acc.mu.RLock()
@@ -3935,11 +3939,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
// Set previous service import to detect chaining.
hadPrevSi, share := c.pa.psi != nil, si.share
lpsi := len(c.pa.psi)
hadPrevSi, share := lpsi > 0, si.share
if hadPrevSi {
share = c.pa.psi.share
share = c.pa.psi[lpsi-1].share
}
c.pa.psi = si
c.pa.psi = append(c.pa.psi, si)
// Place our client info for the request in the original message.
// This will survive going across routes, etc.

View File

@@ -1205,3 +1205,84 @@ default_js_domain: {B:"DHUB"}
require_NoError(t, err)
require_Equal(t, si.Cluster.Name, "HUB")
}
func TestLeafNodeSvcImportExportCycle(t *testing.T) {
accounts := `
accounts {
SYS: {
users: [{user: admin, password: admin}]
}
LEAF_USER: {
users: [{user: leaf_user, password: leaf_user}]
imports: [
{service: {account: LEAF_INGRESS, subject: "foo"}}
{service: {account: LEAF_INGRESS, subject: "_INBOX.>"}}
{service: {account: LEAF_INGRESS, subject: "$JS.leaf.API.>"}, to: "JS.leaf_ingress@leaf.API.>" }
]
jetstream: enabled
}
LEAF_INGRESS: {
users: [{user: leaf_ingress, password: leaf_ingress}]
exports: [
{service: "foo", accounts: [LEAF_USER]}
{service: "_INBOX.>", accounts: [LEAF_USER]}
{service: "$JS.leaf.API.>", response_type: "stream", accounts: [LEAF_USER]}
]
imports: [
]
jetstream: enabled
}
}
system_account: SYS
`
hconf := createConfFile(t, []byte(fmt.Sprintf(`
%s
listen: "127.0.0.1:-1"
leafnodes {
listen: "127.0.0.1:-1"
}
`, accounts)))
defer os.Remove(hconf)
s, o := RunServerWithConfig(hconf)
defer s.Shutdown()
lconf := createConfFile(t, []byte(fmt.Sprintf(`
%s
server_name: leaf-server
jetstream {
store_dir: '%s'
domain=leaf
}
listen: "127.0.0.1:-1"
leafnodes {
remotes = [
{
urls: ["nats-leaf://leaf_ingress:leaf_ingress@127.0.0.1:%v"]
account: "LEAF_INGRESS"
}
]
}
`, accounts, createDir(t, JetStreamStoreDir), o.LeafNode.Port)))
defer os.Remove(lconf)
sl, so := RunServerWithConfig(lconf)
defer sl.Shutdown()
checkLeafNodeConnected(t, sl)
nc := natsConnect(t, fmt.Sprintf("nats://leaf_user:leaf_user@127.0.0.1:%v", so.Port))
defer nc.Close()
js, _ := nc.JetStream(nats.APIPrefix("JS.leaf_ingress@leaf.API."))
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: nats.FileStorage,
})
require_NoError(t, err)
_, err = js.Publish("foo", []byte("msg"))
require_NoError(t, err)
}

View File

@@ -48,7 +48,7 @@ type pubArg struct {
queues [][]byte
size int
hdr int
psi *serviceImport
psi []*serviceImport
}
// Parser constants