diff --git a/server/accounts.go b/server/accounts.go index e47ab83d..497d437d 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -118,7 +118,6 @@ type serviceImport struct { sid []byte from string to string - exsub string tr *transform ts int64 rt ServiceRespType @@ -1737,7 +1736,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im } } } - si := &serviceImport{dest, claim, se, nil, from, to, "", tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, false, isSysAcc, nil} + si := &serviceImport{dest, claim, se, nil, from, to, tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, false, isSysAcc, nil} a.imports.services[from] = si a.mu.Unlock() @@ -2152,7 +2151,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp // dest is the requestor's account. a is the service responder with the export. // Marked as internal here, that is how we distinguish. - si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, false, false, nil} + si := &serviceImport{dest, nil, osi.se, nil, nrr, to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, false, false, nil} if a.exports.responses == nil { a.exports.responses = make(map[string]*serviceImport) diff --git a/server/client.go b/server/client.go index 806978c3..e6ad311a 100644 --- a/server/client.go +++ b/server/client.go @@ -3554,7 +3554,14 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra // processServiceImport is an internal callback when a subscription matches an imported service // from another account. This includes response mappings as well. func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) { - if c.kind == GATEWAY && !si.isRespServiceImport() { + // If we are a GW and this is not a direct serviceImport ignore. + isResponse := si.isRespServiceImport() + if c.kind == GATEWAY && !isResponse { + return + } + // If we are here and we are a serviceImport response make sure we are not matching back + // to the import/export pair that started the request. If so ignore. + if isResponse && c.pa.psi != nil && c.pa.psi.se == si.se { return } @@ -3606,8 +3613,11 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // so we only lock once. to, _ = si.acc.selectMappedSubject(to) - oreply, oacc := c.pa.reply, c.acc + // Save off some of our state. + oreply, oacc, opsi := c.pa.reply, c.acc, c.pa.psi c.pa.reply = nrr + c.pa.psi = si + if !si.isSysAcc { c.mu.Lock() c.acc = si.acc @@ -3648,6 +3658,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // Put what was there back now. c.in.rts = orts c.pa.reply = oreply + c.pa.psi = opsi if !si.isSysAcc { c.mu.Lock() @@ -3817,7 +3828,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, } } - // Remap to original if internal. + // Remap to the original subject if internal. if sub.icb != nil { subj = subject } diff --git a/server/parser.go b/server/parser.go index ad2a83b3..b6bbd537 100644 --- a/server/parser.go +++ b/server/parser.go @@ -47,6 +47,7 @@ type pubArg struct { queues [][]byte size int hdr int + psi *serviceImport } // Parser constants diff --git a/test/accounts_cycles_test.go b/test/accounts_cycles_test.go index 4974cbae..3d77f3ff 100644 --- a/test/accounts_cycles_test.go +++ b/test/accounts_cycles_test.go @@ -18,8 +18,10 @@ import ( "os" "strings" "testing" + "time" "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" ) func TestAccountCycleService(t *testing.T) { @@ -213,6 +215,91 @@ func TestAccountCycleServiceNonCycleChain(t *testing.T) { } } +// bug: https://github.com/nats-io/nats-server/issues/1769 +func TestServiceImportReplyMatchCycle(t *testing.T) { + conf := createConfFile(t, []byte(` + port: -1 + accounts { + A { + users: [{user: d, pass: x}] + imports [ {service: {account: B, subject: ">" }}] + } + B { + users: [{user: x, pass: x}] + exports [ { service: ">" } ] + } + } + no_auth_user: d + `)) + defer os.Remove(conf) + + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + nc1 := clientConnectToServerWithUP(t, opts, "x", "x") + defer nc1.Close() + + msg := []byte("HELLO") + nc1.Subscribe("foo", func(m *nats.Msg) { + m.Respond(msg) + }) + + nc2 := clientConnectToServer(t, s) + defer nc2.Close() + + resp, err := nc2.Request("foo", nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if resp == nil || string(resp.Data) != string(msg) { + t.Fatalf("Wrong or empty response") + } +} + +func TestServiceImportReplyMatchCycleMultiHops(t *testing.T) { + conf := createConfFile(t, []byte(` + port: -1 + accounts { + A { + users: [{user: d, pass: x}] + imports [ {service: {account: B, subject: ">" }}] + } + B { + exports [ { service: ">" } ] + imports [ {service: {account: C, subject: ">" }}] + } + C { + users: [{user: x, pass: x}] + exports [ { service: ">" } ] + } + } + no_auth_user: d + `)) + defer os.Remove(conf) + + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + nc1 := clientConnectToServerWithUP(t, opts, "x", "x") + defer nc1.Close() + + msg := []byte("HELLO") + nc1.Subscribe("foo", func(m *nats.Msg) { + m.Respond(msg) + }) + + nc2 := clientConnectToServer(t, s) + defer nc2.Close() + + resp, err := nc2.Request("foo", nil, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if resp == nil || string(resp.Data) != string(msg) { + t.Fatalf("Wrong or empty response") + } +} + // Go's stack are infinite sans memory, but not call depth. However its good to limit. func TestAccountCycleDepthLimit(t *testing.T) { var last *server.Account