diff --git a/server/client.go b/server/client.go index 112afdd4..8e133a54 100644 --- a/server/client.go +++ b/server/client.go @@ -2419,6 +2419,9 @@ func (c *client) addShadowSub(sub *subscription, im *streamImport, useFrom bool) // Update our route map here. c.srv.updateRouteSubscriptionMap(im.acc, &nsub, 1) + if c.srv.gateway.enabled { + c.srv.gatewayUpdateSubInterest(im.acc.Name, &nsub, 1) + } c.srv.updateLeafNodes(im.acc, &nsub, 1) return &nsub, nil @@ -2545,10 +2548,14 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool // Check to see if we have shadow subscriptions. var updateRoute bool + var updateGWs bool shadowSubs := sub.shadow sub.shadow = nil if len(shadowSubs) > 0 { updateRoute = (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil + if updateRoute { + updateGWs = c.srv.gateway.enabled + } } sub.close() c.mu.Unlock() @@ -2557,8 +2564,13 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool for _, nsub := range shadowSubs { if err := nsub.im.acc.sl.Remove(nsub); err != nil { c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name) - } else if updateRoute { - c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1) + } else { + if updateRoute { + c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1) + } + if updateGWs { + c.srv.gatewayUpdateSubInterest(nsub.im.acc.Name, nsub, -1) + } } // Now check on leafnode updates. c.srv.updateLeafNodes(nsub.im.acc, nsub, -1) diff --git a/server/const.go b/server/const.go index f3b7c0ce..ba96f83c 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-beta.24" + VERSION = "2.2.0-beta.25" // PROTO is the currently supported protocol. // 0 was the original diff --git a/test/leafnode_test.go b/test/leafnode_test.go index eb142437..c6833713 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -4087,3 +4087,120 @@ func TestLeafNodeAdvertiseInCluster(t *testing.T) { s2.Shutdown() expectNothing(t, lc) } + +func TestLeafNodeStreamAndShadowSubs(t *testing.T) { + conf1 := createConfFile(t, []byte(` + port: -1 + system_account: SYS + accounts { + SYS {} + A: { + users: [{ user: a, password: pwd, permissions: {publish: [A.b.>]} }] + exports: [{ stream: A.b.>, accounts: [B] }] + }, + B: { + users: [{ user: b, password: pwd, permissions: {subscribe: [ A.b.> ]}}] + imports: [{ stream: { account: A, subject: A.b.> } }] + } + } + gateway { + name: "A" + port: -1 + } + leafnodes { + port: -1 + authorization: { + users: [ + {user: a, password: pwd, account: A} + ] + } + } + `)) + defer os.Remove(conf1) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + system_account: SYS + accounts { + SYS {} + A: { + users: [{ user: a, password: pwd, permissions: {publish: [A.b.>]} }] + exports: [{ stream: A.b.>, accounts: [B] }] + }, + B: { + users: [{ user: b, password: pwd, permissions: {subscribe: [ A.b.> ]}}] + imports: [{ stream: { account: A, subject: A.b.> } }] + } + } + gateway { + name: "B" + port: -1 + gateways [ + { + name: "A" + urls: ["nats://127.0.0.1:%d"] + } + ] + } + `, o1.Gateway.Port))) + defer os.Remove(conf2) + s2, o2 := RunServerWithConfig(conf2) + defer s2.Shutdown() + + waitForOutboundGateways(t, s1, 1, 2*time.Second) + waitForOutboundGateways(t, s2, 1, 2*time.Second) + + nc, err := nats.Connect(fmt.Sprintf("nats://b:pwd@127.0.0.1:%d", o2.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + sub, err := nc.SubscribeSync("A.b.>") + if err != nil { + t.Fatalf("Error on subscibe: %v", err) + } + defer sub.Unsubscribe() + + conf3 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + system_account: SYS + accounts: { + SYS {} + C: { + imports: [{ stream: { account: D, subject: b.> }, prefix: A }] + } + D: { + users: [{ user: d, password: pwd, permissions: {publish: [ b.> ]} }] + exports: [{ stream: b.>, accounts: [C] }] + } + } + leafnodes { + remotes [ + { + url: "nats://a:pwd@127.0.0.1:%d" + account: C + } + ] + } + `, o1.LeafNode.Port))) + defer os.Remove(conf3) + s3, o3 := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkLeafNodeConnected(t, s1) + checkLeafNodeConnected(t, s3) + + ncl, err := nats.Connect(fmt.Sprintf("nats://d:pwd@127.0.0.1:%d", o3.Port)) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer ncl.Close() + + ncl.Publish("b.c", []byte("test")) + if _, err := sub.NextMsg(time.Second); err != nil { + t.Fatalf("Did not receive message: %v", err) + } +}