Do not propagate service import interest across GW and ROUTES

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2021-04-15 11:30:49 -06:00
parent 311b3cd701
commit 56d0d9ec87
7 changed files with 24 additions and 15 deletions

View File

@@ -1839,7 +1839,7 @@ func (a *Account) addServiceImportSub(si *serviceImport) error {
cb := func(sub *subscription, c *client, subject, reply string, msg []byte) {
c.processServiceImport(si, a, msg)
}
_, err := c.processSub([]byte(subject), nil, []byte(sid), cb, true)
_, err := c.processSubEx([]byte(subject), nil, []byte(sid), cb, true, true)
return err
}

View File

@@ -481,6 +481,7 @@ type subscription struct {
client *client
im *streamImport // This is for import stream support.
rsi bool
si bool
shadow []*subscription // This is to track shadowed accounts.
icb msgHandler
subject []byte
@@ -2331,8 +2332,12 @@ func (c *client) parseSub(argo []byte, noForward bool) error {
}
func (c *client) processSub(subject, queue, bsid []byte, cb msgHandler, noForward bool) (*subscription, error) {
return c.processSubEx(subject, queue, bsid, cb, noForward, false)
}
func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForward, si bool) (*subscription, error) {
// Create the subscription
sub := &subscription{client: c, subject: subject, queue: queue, sid: bsid, icb: cb}
sub := &subscription{client: c, subject: subject, queue: queue, sid: bsid, icb: cb, si: si}
c.mu.Lock()
@@ -3707,7 +3712,7 @@ func getHeader(key string, hdr []byte) []byte {
func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) {
// If we are a GW and this is not a direct serviceImport ignore.
isResponse := si.isRespServiceImport()
if c.kind == GATEWAY && !isResponse {
if (c.kind == GATEWAY || c.kind == ROUTER) && !isResponse {
return
}
// If we are here and we are a serviceImport response make sure we are not matching back

View File

@@ -1630,11 +1630,10 @@ func TestSystemAccountWithGateways(t *testing.T) {
msg := findMsgs(fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", sa.SystemAccount().Name))
var bMsg *nats.Msg
if len(msg) != 1 {
t.Fatal("Expected one message")
} else {
bMsg = msg[0]
if len(msg) < 1 {
t.Fatal("Expected at least one message")
}
bMsg = msg[len(msg)-1]
require_Contains(t, string(bMsg.Data), sb.ID())
require_Contains(t, string(bMsg.Data), `"cluster":"B"`)

View File

@@ -2213,6 +2213,10 @@ func (s *Server) sendQueueSubOrUnsubToGateways(accName string, qsub *subscriptio
// <Invoked from client or route connection's readLoop or when such
// connection is closed>
func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, change int32) {
if sub.si {
return
}
var (
keya [1024]byte
key = keya[:0]

View File

@@ -638,6 +638,10 @@ var allJsExports = []string{
}
func (js *jetStream) apiDispatch(sub *subscription, c *client, subject, reply string, rmsg []byte) {
hdr, _ := c.msgParts(rmsg)
if len(getHeader(ClientInfoHdr, hdr)) == 0 {
return
}
js.mu.RLock()
s := js.srv
rr := js.apiSubs.Match(subject)

View File

@@ -4235,9 +4235,6 @@ func TestJWTJetStreamLimits(t *testing.T) {
// create system account
sysKp, _ := nkeys.CreateAccount()
sysPub, _ := sysKp.PublicKey()
claim := jwt.NewAccountClaims(sysPub)
sysJwt, err := claim.Encode(oKp)
require_NoError(t, err)
sysUKp, _ := nkeys.CreateUser()
sysUSeed, _ := sysUKp.Seed()
uclaim := newJWTTestUserClaims()
@@ -4255,7 +4252,7 @@ func TestJWTJetStreamLimits(t *testing.T) {
// create account using jetstream with both limits
akp, _ := nkeys.CreateAccount()
aPub, _ := akp.PublicKey()
claim = jwt.NewAccountClaims(aPub)
claim := jwt.NewAccountClaims(aPub)
claim.Limits.JetStreamLimits = limits1
aJwt1, err := claim.Encode(oKp)
require_NoError(t, err)
@@ -4292,10 +4289,6 @@ func TestJWTJetStreamLimits(t *testing.T) {
s, opts := RunServerWithConfig(conf)
defer s.Shutdown()
port := opts.Port
updateJwt(s.ClientURL(), sysCreds, sysPub, sysJwt)
sys := natsConnect(t, s.ClientURL(), nats.UserCredentials(sysCreds))
expect_InfoError(sys)
sys.Close()
updateJwt(s.ClientURL(), sysCreds, aPub, aJwt1)
c := natsConnect(t, s.ClientURL(), nats.UserCredentials(userCreds), nats.ReconnectWait(200*time.Millisecond))
defer c.Close()

View File

@@ -1498,6 +1498,10 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
return
}
if sub.si {
return
}
// Copy to hold outside acc lock.
var n int32
var ok bool