[FIXED] Stream's subscription propagation issue with gateways

When creating shadow subscriptions for import streams, we were
not invoking code for gateway subscription accounting, which means
that when the account (for leafnodes) was switched to interest
only, those shadow subscriptions were not sent.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2020-10-13 18:10:27 -06:00
parent 99cb1fb3f0
commit 26cd1f99ab
3 changed files with 132 additions and 3 deletions

View File

@@ -4087,3 +4087,120 @@ func TestLeafNodeAdvertiseInCluster(t *testing.T) {
s2.Shutdown()
expectNothing(t, lc)
}
func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
conf1 := createConfFile(t, []byte(`
port: -1
system_account: SYS
accounts {
SYS {}
A: {
users: [{ user: a, password: pwd, permissions: {publish: [A.b.>]} }]
exports: [{ stream: A.b.>, accounts: [B] }]
},
B: {
users: [{ user: b, password: pwd, permissions: {subscribe: [ A.b.> ]}}]
imports: [{ stream: { account: A, subject: A.b.> } }]
}
}
gateway {
name: "A"
port: -1
}
leafnodes {
port: -1
authorization: {
users: [
{user: a, password: pwd, account: A}
]
}
}
`))
defer os.Remove(conf1)
s1, o1 := RunServerWithConfig(conf1)
defer s1.Shutdown()
conf2 := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
system_account: SYS
accounts {
SYS {}
A: {
users: [{ user: a, password: pwd, permissions: {publish: [A.b.>]} }]
exports: [{ stream: A.b.>, accounts: [B] }]
},
B: {
users: [{ user: b, password: pwd, permissions: {subscribe: [ A.b.> ]}}]
imports: [{ stream: { account: A, subject: A.b.> } }]
}
}
gateway {
name: "B"
port: -1
gateways [
{
name: "A"
urls: ["nats://127.0.0.1:%d"]
}
]
}
`, o1.Gateway.Port)))
defer os.Remove(conf2)
s2, o2 := RunServerWithConfig(conf2)
defer s2.Shutdown()
waitForOutboundGateways(t, s1, 1, 2*time.Second)
waitForOutboundGateways(t, s2, 1, 2*time.Second)
nc, err := nats.Connect(fmt.Sprintf("nats://b:pwd@127.0.0.1:%d", o2.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sub, err := nc.SubscribeSync("A.b.>")
if err != nil {
t.Fatalf("Error on subscibe: %v", err)
}
defer sub.Unsubscribe()
conf3 := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
system_account: SYS
accounts: {
SYS {}
C: {
imports: [{ stream: { account: D, subject: b.> }, prefix: A }]
}
D: {
users: [{ user: d, password: pwd, permissions: {publish: [ b.> ]} }]
exports: [{ stream: b.>, accounts: [C] }]
}
}
leafnodes {
remotes [
{
url: "nats://a:pwd@127.0.0.1:%d"
account: C
}
]
}
`, o1.LeafNode.Port)))
defer os.Remove(conf3)
s3, o3 := RunServerWithConfig(conf3)
defer s3.Shutdown()
checkLeafNodeConnected(t, s1)
checkLeafNodeConnected(t, s3)
ncl, err := nats.Connect(fmt.Sprintf("nats://d:pwd@127.0.0.1:%d", o3.Port))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncl.Close()
ncl.Publish("b.c", []byte("test"))
if _, err := sub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not receive message: %v", err)
}
}