From b3e355c26390c74f6d97ad51e67d2307c0a5ebd7 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 15 Apr 2021 18:28:11 -0400 Subject: [PATCH] [fixed] sub ref count issue across leaf node connections This was caused by not sending subs across leaf node connections in some cases but sending unsub in all cases. This imbalance caused subscriptions to go away too soon. (ref count was off) Signed-off-by: Matthias Hanel --- server/client.go | 11 ++++++++--- server/leafnode.go | 13 ++++++++----- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/server/client.go b/server/client.go index c31e2877..151e2894 100644 --- a/server/client.go +++ b/server/client.go @@ -4568,6 +4568,7 @@ func (c *client) closeConnection(reason ClosedState) { // Unregister srv.removeClient(c) + notSpoke := !(kind == LEAF && c.isSpokeLeafNode()) // Update remote subscriptions. if acc != nil && (kind == CLIENT || kind == LEAF) { qsubs := map[string]*qsub{} @@ -4576,7 +4577,9 @@ func (c *client) closeConnection(reason ClosedState) { c.unsubscribe(acc, sub, true, false) // Update route as normal for a normal subscriber. if sub.queue == nil { - srv.updateRouteSubscriptionMap(acc, sub, -1) + if notSpoke { + srv.updateRouteSubscriptionMap(acc, sub, -1) + } srv.updateLeafNodes(acc, sub, -1) } else { // We handle queue subscribers special in case we @@ -4589,13 +4592,15 @@ func (c *client) closeConnection(reason ClosedState) { qsubs[key] = &qsub{sub, 1} } } - if srv.gateway.enabled { + if srv.gateway.enabled && notSpoke { srv.gatewayUpdateSubInterest(acc.Name, sub, -1) } } // Process any qsubs here. for _, esub := range qsubs { - srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n)) + if notSpoke { + srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n)) + } srv.updateLeafNodes(acc, esub.sub, -(esub.n)) } if prev := acc.removeClient(c); prev == 1 { diff --git a/server/leafnode.go b/server/leafnode.go index 48786a11..325a7933 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1651,6 +1651,7 @@ func (c *client) processLeafUnsub(arg []byte) error { } updateGWs := false + spoke := c.isSpokeLeafNode() // We store local subs by account and subject and optionally queue name. // LS- will have the arg exactly as the key. sub, ok := c.subs[string(arg)] @@ -1661,11 +1662,13 @@ func (c *client) processLeafUnsub(arg []byte) error { updateGWs = srv.gateway.enabled } - // If we are routing subtract from the route map for the associated account. - srv.updateRouteSubscriptionMap(acc, sub, -1) - // Gateways - if updateGWs { - srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + if !spoke { + // If we are routing subtract from the route map for the associated account. + srv.updateRouteSubscriptionMap(acc, sub, -1) + // Gateways + if updateGWs { + srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + } } // Now check on leafnode updates for other leaf nodes. srv.updateLeafNodes(acc, sub, -1)