Merge pull request #1773 from nats-io/cycle_wc_bug

Catch condition where a serviceImport response matched the original import.
This commit is contained in:
Derek Collison
2020-12-14 08:20:55 -08:00
committed by GitHub
4 changed files with 104 additions and 6 deletions

View File

@@ -118,7 +118,6 @@ type serviceImport struct {
sid []byte
from string
to string
exsub string
tr *transform
ts int64
rt ServiceRespType
@@ -1737,7 +1736,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
}
}
}
si := &serviceImport{dest, claim, se, nil, from, to, "", tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, false, isSysAcc, nil}
si := &serviceImport{dest, claim, se, nil, from, to, tr, 0, rt, lat, nil, nil, usePub, false, false, false, false, false, isSysAcc, nil}
a.imports.services[from] = si
a.mu.Unlock()
@@ -2152,7 +2151,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp
// dest is the requestor's account. a is the service responder with the export.
// Marked as internal here, that is how we distinguish.
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, osi.to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, false, false, nil}
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, false, false, nil}
if a.exports.responses == nil {
a.exports.responses = make(map[string]*serviceImport)

View File

@@ -3554,7 +3554,14 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra
// processServiceImport is an internal callback when a subscription matches an imported service
// from another account. This includes response mappings as well.
func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) {
if c.kind == GATEWAY && !si.isRespServiceImport() {
// If we are a GW and this is not a direct serviceImport ignore.
isResponse := si.isRespServiceImport()
if c.kind == GATEWAY && !isResponse {
return
}
// If we are here and we are a serviceImport response make sure we are not matching back
// to the import/export pair that started the request. If so ignore.
if isResponse && c.pa.psi != nil && c.pa.psi.se == si.se {
return
}
@@ -3606,8 +3613,11 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// so we only lock once.
to, _ = si.acc.selectMappedSubject(to)
oreply, oacc := c.pa.reply, c.acc
// Save off some of our state.
oreply, oacc, opsi := c.pa.reply, c.acc, c.pa.psi
c.pa.reply = nrr
c.pa.psi = si
if !si.isSysAcc {
c.mu.Lock()
c.acc = si.acc
@@ -3648,6 +3658,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// Put what was there back now.
c.in.rts = orts
c.pa.reply = oreply
c.pa.psi = opsi
if !si.isSysAcc {
c.mu.Lock()
@@ -3817,7 +3828,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
}
}
// Remap to original if internal.
// Remap to the original subject if internal.
if sub.icb != nil {
subj = subject
}

View File

@@ -47,6 +47,7 @@ type pubArg struct {
queues [][]byte
size int
hdr int
psi *serviceImport
}
// Parser constants

View File

@@ -18,8 +18,10 @@ import (
"os"
"strings"
"testing"
"time"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)
func TestAccountCycleService(t *testing.T) {
@@ -213,6 +215,91 @@ func TestAccountCycleServiceNonCycleChain(t *testing.T) {
}
}
// bug: https://github.com/nats-io/nats-server/issues/1769
func TestServiceImportReplyMatchCycle(t *testing.T) {
conf := createConfFile(t, []byte(`
port: -1
accounts {
A {
users: [{user: d, pass: x}]
imports [ {service: {account: B, subject: ">" }}]
}
B {
users: [{user: x, pass: x}]
exports [ { service: ">" } ]
}
}
no_auth_user: d
`))
defer os.Remove(conf)
s, opts := RunServerWithConfig(conf)
defer s.Shutdown()
nc1 := clientConnectToServerWithUP(t, opts, "x", "x")
defer nc1.Close()
msg := []byte("HELLO")
nc1.Subscribe("foo", func(m *nats.Msg) {
m.Respond(msg)
})
nc2 := clientConnectToServer(t, s)
defer nc2.Close()
resp, err := nc2.Request("foo", nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resp == nil || string(resp.Data) != string(msg) {
t.Fatalf("Wrong or empty response")
}
}
func TestServiceImportReplyMatchCycleMultiHops(t *testing.T) {
conf := createConfFile(t, []byte(`
port: -1
accounts {
A {
users: [{user: d, pass: x}]
imports [ {service: {account: B, subject: ">" }}]
}
B {
exports [ { service: ">" } ]
imports [ {service: {account: C, subject: ">" }}]
}
C {
users: [{user: x, pass: x}]
exports [ { service: ">" } ]
}
}
no_auth_user: d
`))
defer os.Remove(conf)
s, opts := RunServerWithConfig(conf)
defer s.Shutdown()
nc1 := clientConnectToServerWithUP(t, opts, "x", "x")
defer nc1.Close()
msg := []byte("HELLO")
nc1.Subscribe("foo", func(m *nats.Msg) {
m.Respond(msg)
})
nc2 := clientConnectToServer(t, s)
defer nc2.Close()
resp, err := nc2.Request("foo", nil, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if resp == nil || string(resp.Data) != string(msg) {
t.Fatalf("Wrong or empty response")
}
}
// Go's stack are infinite sans memory, but not call depth. However its good to limit.
func TestAccountCycleDepthLimit(t *testing.T) {
var last *server.Account