Merge pull request #2225 from nats-io/double_import

Under double import scenarios we could possibly map to the wrong subject.
This commit is contained in:
Derek Collison
2021-05-19 08:50:59 -07:00
committed by GitHub
2 changed files with 71 additions and 11 deletions

View File

@@ -3771,7 +3771,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
didSendTL = acc.sendTrackingLatency(si, c)
}
// Pick correct to subject. If we matched on a wildcard use the literal publish subject.
// Pick correct "to" subject. If we matched on a wildcard use the literal publish subject.
to, subject := si.to, string(c.pa.subject)
hadPrevSi := c.pa.psi != nil
@@ -3779,21 +3779,21 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// FIXME(dlc) - This could be slow, may want to look at adding cache to bare transforms?
to, _ = si.tr.transformSubject(subject)
} else if si.usePub {
if hadPrevSi && c.pa.psi.tr != nil {
to, _ = c.pa.psi.tr.transformSubject(subject)
} else {
to = subject
}
to = subject
}
// Now check to see if this account has mappings that could affect the service import.
// Can't use non-locked trick like in processInboundClientMsg, so just call into selectMappedSubject
// so we only lock once.
to, _ = si.acc.selectMappedSubject(to)
// Copy our pubArg and account
pacopy := c.pa
oacc := c.acc
// Now check to see if this account has mappings that could affect the service import.
// Can't use non-locked trick like in processInboundClientMsg, so just call into selectMappedSubject
// so we only lock once.
if nsubj, changed := si.acc.selectMappedSubject(to); changed {
c.pa.mapped = []byte(to)
to = nsubj
}
// Change this so that we detect recursion
// Remember prior.
share := si.share
@@ -3823,7 +3823,10 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
}
// Set our reply.
// Set our optional subject(to) and reply.
if !si.response && to != subject {
c.pa.subject = []byte(to)
}
c.pa.reply = nrr
c.mu.Lock()
c.acc = si.acc

View File

@@ -6272,6 +6272,7 @@ func TestJetStreamClusterDomainsAndAPIResponses(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error getting JetStream context: %v", err)
}
si, err := jsSpoke.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
@@ -6763,6 +6764,62 @@ func TestJetStreamClusterUpdateStreamToExisting(t *testing.T) {
}
}
func TestJetStreamClusterCrossAccountInterop(t *testing.T) {
template := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
JS {
jetstream: enabled
users = [ { user: "rip", pass: "pass" } ]
exports [
{ service: "$JS.API.CONSUMER.INFO.>" }
]
}
IA {
jetstream: enabled
users = [ { user: "dlc", pass: "pass" } ]
imports [
{ service: { account: JS, subject: "$JS.API.CONSUMER.INFO.TEST.DLC"}, to: "FROM.DLC" }
]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`
c := createJetStreamClusterWithTemplate(t, template, "C22", 3)
defer c.shutdown()
// Create the stream and the consumer under the JS/rip user.
s := c.randomServer()
nc, js := jsClientConnect(t, s, nats.UserInfo("rip", "pass"))
defer nc.Close()
if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 2}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err := js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "DLC", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now we want to access the consumer info from IA/dlc.
nc, _ = jsClientConnect(t, c.randomServer(), nats.UserInfo("dlc", "pass"))
defer nc.Close()
if _, err := nc.Request("FROM.DLC", nil, time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Support functions
// Used to setup superclusters for tests.