Merge pull request #1641 from nats-io/fix_gw_shadow_subs

[FIXED] Stream's subscription propagation issue with gateways
This commit is contained in:
Ivan Kozlovic
2020-10-13 19:21:24 -06:00
committed by GitHub
3 changed files with 132 additions and 3 deletions

View File

@@ -2419,6 +2419,9 @@ func (c *client) addShadowSub(sub *subscription, im *streamImport, useFrom bool)
// Update our route map here.
c.srv.updateRouteSubscriptionMap(im.acc, &nsub, 1)
if c.srv.gateway.enabled {
c.srv.gatewayUpdateSubInterest(im.acc.Name, &nsub, 1)
}
c.srv.updateLeafNodes(im.acc, &nsub, 1)
return &nsub, nil
@@ -2545,10 +2548,14 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
// Check to see if we have shadow subscriptions.
var updateRoute bool
var updateGWs bool
shadowSubs := sub.shadow
sub.shadow = nil
if len(shadowSubs) > 0 {
updateRoute = (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil
if updateRoute {
updateGWs = c.srv.gateway.enabled
}
}
sub.close()
c.mu.Unlock()
@@ -2557,8 +2564,13 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
for _, nsub := range shadowSubs {
if err := nsub.im.acc.sl.Remove(nsub); err != nil {
c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name)
} else if updateRoute {
c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1)
} else {
if updateRoute {
c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1)
}
if updateGWs {
c.srv.gatewayUpdateSubInterest(nsub.im.acc.Name, nsub, -1)
}
}
// Now check on leafnode updates.
c.srv.updateLeafNodes(nsub.im.acc, nsub, -1)

View File

@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.2.0-beta.24"
VERSION = "2.2.0-beta.25"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -4087,3 +4087,120 @@ func TestLeafNodeAdvertiseInCluster(t *testing.T) {
s2.Shutdown()
expectNothing(t, lc)
}
func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
conf1 := createConfFile(t, []byte(`
port: -1
system_account: SYS
accounts {
SYS {}
A: {
users: [{ user: a, password: pwd, permissions: {publish: [A.b.>]} }]
exports: [{ stream: A.b.>, accounts: [B] }]
},
B: {
users: [{ user: b, password: pwd, permissions: {subscribe: [ A.b.> ]}}]
imports: [{ stream: { account: A, subject: A.b.> } }]
}
}
gateway {
name: "A"
port: -1
}
leafnodes {
port: -1
authorization: {
users: [
{user: a, password: pwd, account: A}
]
}
}
`))
defer os.Remove(conf1)
s1, o1 := RunServerWithConfig(conf1)
defer s1.Shutdown()
conf2 := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
system_account: SYS
accounts {
SYS {}
A: {
users: [{ user: a, password: pwd, permissions: {publish: [A.b.>]} }]
exports: [{ stream: A.b.>, accounts: [B] }]
},
B: {
users: [{ user: b, password: pwd, permissions: {subscribe: [ A.b.> ]}}]
imports: [{ stream: { account: A, subject: A.b.> } }]
}
}
gateway {
name: "B"
port: -1
gateways [
{
name: "A"
urls: ["nats://127.0.0.1:%d"]
}
]
}
`, o1.Gateway.Port)))
defer os.Remove(conf2)
s2, o2 := RunServerWithConfig(conf2)
defer s2.Shutdown()
waitForOutboundGateways(t, s1, 1, 2*time.Second)
waitForOutboundGateways(t, s2, 1, 2*time.Second)
nc, err := nats.Connect(fmt.Sprintf("nats://b:pwd@127.0.0.1:%d", o2.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sub, err := nc.SubscribeSync("A.b.>")
if err != nil {
t.Fatalf("Error on subscibe: %v", err)
}
defer sub.Unsubscribe()
conf3 := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
system_account: SYS
accounts: {
SYS {}
C: {
imports: [{ stream: { account: D, subject: b.> }, prefix: A }]
}
D: {
users: [{ user: d, password: pwd, permissions: {publish: [ b.> ]} }]
exports: [{ stream: b.>, accounts: [C] }]
}
}
leafnodes {
remotes [
{
url: "nats://a:pwd@127.0.0.1:%d"
account: C
}
]
}
`, o1.LeafNode.Port)))
defer os.Remove(conf3)
s3, o3 := RunServerWithConfig(conf3)
defer s3.Shutdown()
checkLeafNodeConnected(t, s1)
checkLeafNodeConnected(t, s3)
ncl, err := nats.Connect(fmt.Sprintf("nats://d:pwd@127.0.0.1:%d", o3.Port))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncl.Close()
ncl.Publish("b.c", []byte("test"))
if _, err := sub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not receive message: %v", err)
}
}