From 26cd1f99abdf63b50332d13d97731db6b32f885d Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 13 Oct 2020 18:10:27 -0600 Subject: [PATCH] [FIXED] Stream's subscription propagation issue with gateways When creating shadow subscriptions for import streams, we were not invoking code for gateway subscription accounting, which means that when the account (for leafnodes) was switched to interest only, those shadow subscriptions were not sent. Signed-off-by: Ivan Kozlovic --- server/client.go | 16 +++++- server/const.go | 2 +- test/leafnode_test.go | 117 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 3 deletions(-) diff --git a/server/client.go b/server/client.go index 112afdd4..8e133a54 100644 --- a/server/client.go +++ b/server/client.go @@ -2419,6 +2419,9 @@ func (c *client) addShadowSub(sub *subscription, im *streamImport, useFrom bool) // Update our route map here. c.srv.updateRouteSubscriptionMap(im.acc, &nsub, 1) + if c.srv.gateway.enabled { + c.srv.gatewayUpdateSubInterest(im.acc.Name, &nsub, 1) + } c.srv.updateLeafNodes(im.acc, &nsub, 1) return &nsub, nil @@ -2545,10 +2548,14 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool // Check to see if we have shadow subscriptions. var updateRoute bool + var updateGWs bool shadowSubs := sub.shadow sub.shadow = nil if len(shadowSubs) > 0 { updateRoute = (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil + if updateRoute { + updateGWs = c.srv.gateway.enabled + } } sub.close() c.mu.Unlock() @@ -2557,8 +2564,13 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool for _, nsub := range shadowSubs { if err := nsub.im.acc.sl.Remove(nsub); err != nil { c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name) - } else if updateRoute { - c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1) + } else { + if updateRoute { + c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1) + } + if updateGWs { + c.srv.gatewayUpdateSubInterest(nsub.im.acc.Name, nsub, -1) + } } // Now check on leafnode updates. c.srv.updateLeafNodes(nsub.im.acc, nsub, -1) diff --git a/server/const.go b/server/const.go index f3b7c0ce..ba96f83c 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-beta.24" + VERSION = "2.2.0-beta.25" // PROTO is the currently supported protocol. // 0 was the original diff --git a/test/leafnode_test.go b/test/leafnode_test.go index eb142437..c6833713 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -4087,3 +4087,120 @@ func TestLeafNodeAdvertiseInCluster(t *testing.T) { s2.Shutdown() expectNothing(t, lc) } + +func TestLeafNodeStreamAndShadowSubs(t *testing.T) { + conf1 := createConfFile(t, []byte(` + port: -1 + system_account: SYS + accounts { + SYS {} + A: { + users: [{ user: a, password: pwd, permissions: {publish: [A.b.>]} }] + exports: [{ stream: A.b.>, accounts: [B] }] + }, + B: { + users: [{ user: b, password: pwd, permissions: {subscribe: [ A.b.> ]}}] + imports: [{ stream: { account: A, subject: A.b.> } }] + } + } + gateway { + name: "A" + port: -1 + } + leafnodes { + port: -1 + authorization: { + users: [ + {user: a, password: pwd, account: A} + ] + } + } + `)) + defer os.Remove(conf1) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + system_account: SYS + accounts { + SYS {} + A: { + users: [{ user: a, password: pwd, permissions: {publish: [A.b.>]} }] + exports: [{ stream: A.b.>, accounts: [B] }] + }, + B: { + users: [{ user: b, password: pwd, permissions: {subscribe: [ A.b.> ]}}] + imports: [{ stream: { account: A, subject: A.b.> } }] + } + } + gateway { + name: "B" + port: -1 + gateways [ + { + name: "A" + urls: ["nats://127.0.0.1:%d"] + } + ] + } + `, o1.Gateway.Port))) + defer os.Remove(conf2) + s2, o2 := RunServerWithConfig(conf2) + defer s2.Shutdown() + + waitForOutboundGateways(t, s1, 1, 2*time.Second) + waitForOutboundGateways(t, s2, 1, 2*time.Second) + + nc, err := nats.Connect(fmt.Sprintf("nats://b:pwd@127.0.0.1:%d", o2.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + sub, err := nc.SubscribeSync("A.b.>") + if err != nil { + t.Fatalf("Error on subscibe: %v", err) + } + defer sub.Unsubscribe() + + conf3 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + system_account: SYS + accounts: { + SYS {} + C: { + imports: [{ stream: { account: D, subject: b.> }, prefix: A }] + } + D: { + users: [{ user: d, password: pwd, permissions: {publish: [ b.> ]} }] + exports: [{ stream: b.>, accounts: [C] }] + } + } + leafnodes { + remotes [ + { + url: "nats://a:pwd@127.0.0.1:%d" + account: C + } + ] + } + `, o1.LeafNode.Port))) + defer os.Remove(conf3) + s3, o3 := RunServerWithConfig(conf3) + defer s3.Shutdown() + + checkLeafNodeConnected(t, s1) + checkLeafNodeConnected(t, s3) + + ncl, err := nats.Connect(fmt.Sprintf("nats://d:pwd@127.0.0.1:%d", o3.Port)) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer ncl.Close() + + ncl.Publish("b.c", []byte("test")) + if _, err := sub.NextMsg(time.Second); err != nil { + t.Fatalf("Did not receive message: %v", err) + } +}