diff --git a/server/accounts.go b/server/accounts.go index c8dd2045..8c68d874 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2141,15 +2141,8 @@ func (a *Account) processServiceImportResponse(sub *subscription, c *client, _ * } a.mu.RUnlock() - old := c.pa.subject - if c.kind == CLIENT || c.kind == LEAF { - // reset state to prior to service invocation (code to reset c.pa.subject to old may not be necessary) - c.pa.subject = []byte(si.to) - } - // Send for normal processing. c.processServiceImport(si, a, msg) - c.pa.subject = old } // Will create the response prefix for fast generation of responses. diff --git a/server/accounts_test.go b/server/accounts_test.go index 30479f87..3d715ed9 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -3526,29 +3526,32 @@ func TestAccountUserSubPermsWithQueueGroups(t *testing.T) { } func TestAccountImportCycle(t *testing.T) { - cf := createConfFile(t, []byte(` - port: -1 - accounts: { - CP: { - users: [ - {user: cp, password: cp}, - ], - exports: [ - {service: "q1.>", response_type: Singleton}, - {service: "q2.>", response_type: Singleton}, - ], - }, - A: { - users: [ - {user: a, password: a}, - ], - imports: [ - {service: {account: CP, subject: "q1.>"}}, - {service: {account: CP, subject: "q2.>"}}, - ] - }, - } - `)) + tmpl := ` + port: -1 + accounts: { + CP: { + users: [ + {user: cp, password: cp}, + ], + exports: [ + {service: "q1.>", response_type: Singleton}, + {service: "q2.>", response_type: Singleton}, + %s + ], + }, + A: { + users: [ + {user: a, password: a}, + ], + imports: [ + {service: {account: CP, subject: "q1.>"}}, + {service: {account: CP, subject: "q2.>"}}, + %s + ] + }, + } + ` + cf := createConfFile(t, []byte(fmt.Sprintf(tmpl, _EMPTY_, _EMPTY_))) defer removeFile(t, cf) s, _ := RunServerWithConfig(cf) defer s.Shutdown() @@ -3558,22 +3561,27 @@ func TestAccountImportCycle(t *testing.T) { ncA, err := nats.Connect(s.ClientURL(), nats.UserInfo("a", "a")) require_NoError(t, err) defer ncA.Close() - // setup reply - subCp, err := ncCp.SubscribeSync("q1.>") - require_NoError(t, err) - // setup requestor and send reply + // setup responder + natsSub(t, ncCp, "q1.>", func(m *nats.Msg) { m.Respond([]byte("reply")) }) + // setup requestor ib := "q2.inbox" subAResp, err := ncA.SubscribeSync(ib) require_NoError(t, err) - // send request - err = ncA.PublishRequest("q1.a", ib, []byte("test")) + req := func() { + t.Helper() + // send request + err = ncA.PublishRequest("q1.a", ib, []byte("test")) + require_NoError(t, err) + mRep, err := subAResp.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, string(mRep.Data), "reply") + } + req() + + // Update the config and do a config reload and make sure it all still work + changeCurrentConfigContentWithNewContent(t, cf, []byte( + fmt.Sprintf(tmpl, `{service: "q3.>", response_type: Singleton},`, `{service: {account: CP, subject: "q3.>"}},`))) + err = s.Reload() require_NoError(t, err) - // reply - mReq, err := subCp.NextMsg(time.Second) - require_NoError(t, err) - err = mReq.Respond([]byte("reply")) - require_NoError(t, err) - mRep, err := subAResp.NextMsg(time.Second) - require_NoError(t, err) - require_Contains(t, string(mRep.Data), "reply") + req() } diff --git a/server/client.go b/server/client.go index 601078f6..44177fbd 100644 --- a/server/client.go +++ b/server/client.go @@ -3858,8 +3858,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt } // If we are here and we are a serviceImport response make sure we are not matching back // to the import/export pair that started the request. If so ignore. - if isResponse && c.pa.psi != nil && c.pa.psi.se == si.se { - return + if isResponse && len(c.pa.psi) > 0 { + for i := len(c.pa.psi) - 1; i >= 0; i-- { + if psi := c.pa.psi[i]; psi.se == si.se { + return + } + } } acc.mu.RLock() @@ -3935,11 +3939,12 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt } // Set previous service import to detect chaining. - hadPrevSi, share := c.pa.psi != nil, si.share + lpsi := len(c.pa.psi) + hadPrevSi, share := lpsi > 0, si.share if hadPrevSi { - share = c.pa.psi.share + share = c.pa.psi[lpsi-1].share } - c.pa.psi = si + c.pa.psi = append(c.pa.psi, si) // Place our client info for the request in the original message. // This will survive going across routes, etc. diff --git a/server/jetstream_leafnode_test.go b/server/jetstream_leafnode_test.go index 5343bbb9..6277c1fc 100644 --- a/server/jetstream_leafnode_test.go +++ b/server/jetstream_leafnode_test.go @@ -1205,3 +1205,84 @@ default_js_domain: {B:"DHUB"} require_NoError(t, err) require_Equal(t, si.Cluster.Name, "HUB") } + +func TestLeafNodeSvcImportExportCycle(t *testing.T) { + accounts := ` + accounts { + SYS: { + users: [{user: admin, password: admin}] + } + LEAF_USER: { + users: [{user: leaf_user, password: leaf_user}] + imports: [ + {service: {account: LEAF_INGRESS, subject: "foo"}} + {service: {account: LEAF_INGRESS, subject: "_INBOX.>"}} + {service: {account: LEAF_INGRESS, subject: "$JS.leaf.API.>"}, to: "JS.leaf_ingress@leaf.API.>" } + ] + jetstream: enabled + } + LEAF_INGRESS: { + users: [{user: leaf_ingress, password: leaf_ingress}] + exports: [ + {service: "foo", accounts: [LEAF_USER]} + {service: "_INBOX.>", accounts: [LEAF_USER]} + {service: "$JS.leaf.API.>", response_type: "stream", accounts: [LEAF_USER]} + ] + imports: [ + ] + jetstream: enabled + } + } + system_account: SYS + ` + + hconf := createConfFile(t, []byte(fmt.Sprintf(` + %s + listen: "127.0.0.1:-1" + leafnodes { + listen: "127.0.0.1:-1" + } + `, accounts))) + defer os.Remove(hconf) + s, o := RunServerWithConfig(hconf) + defer s.Shutdown() + + lconf := createConfFile(t, []byte(fmt.Sprintf(` + %s + server_name: leaf-server + jetstream { + store_dir: '%s' + domain=leaf + } + + listen: "127.0.0.1:-1" + leafnodes { + remotes = [ + { + urls: ["nats-leaf://leaf_ingress:leaf_ingress@127.0.0.1:%v"] + account: "LEAF_INGRESS" + } + ] + } + `, accounts, createDir(t, JetStreamStoreDir), o.LeafNode.Port))) + defer os.Remove(lconf) + sl, so := RunServerWithConfig(lconf) + defer sl.Shutdown() + + checkLeafNodeConnected(t, sl) + + nc := natsConnect(t, fmt.Sprintf("nats://leaf_user:leaf_user@127.0.0.1:%v", so.Port)) + defer nc.Close() + + js, _ := nc.JetStream(nats.APIPrefix("JS.leaf_ingress@leaf.API.")) + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: nats.FileStorage, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", []byte("msg")) + require_NoError(t, err) +} diff --git a/server/parser.go b/server/parser.go index d5c9b495..74f55f57 100644 --- a/server/parser.go +++ b/server/parser.go @@ -48,7 +48,7 @@ type pubArg struct { queues [][]byte size int hdr int - psi *serviceImport + psi []*serviceImport } // Parser constants