mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[fixed] sub ref count issue across leaf node connections
This was caused by not sending subs across leaf node connections in some cases but sending unsub in all cases. This imbalance caused subscriptions to go away too soon. (ref count was off) Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
committed by
Matthias Hanel
parent
35bf0e8ce5
commit
b3e355c263
@@ -4568,6 +4568,7 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
// Unregister
|
||||
srv.removeClient(c)
|
||||
|
||||
notSpoke := !(kind == LEAF && c.isSpokeLeafNode())
|
||||
// Update remote subscriptions.
|
||||
if acc != nil && (kind == CLIENT || kind == LEAF) {
|
||||
qsubs := map[string]*qsub{}
|
||||
@@ -4576,7 +4577,9 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
c.unsubscribe(acc, sub, true, false)
|
||||
// Update route as normal for a normal subscriber.
|
||||
if sub.queue == nil {
|
||||
srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
if notSpoke {
|
||||
srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
}
|
||||
srv.updateLeafNodes(acc, sub, -1)
|
||||
} else {
|
||||
// We handle queue subscribers special in case we
|
||||
@@ -4589,13 +4592,15 @@ func (c *client) closeConnection(reason ClosedState) {
|
||||
qsubs[key] = &qsub{sub, 1}
|
||||
}
|
||||
}
|
||||
if srv.gateway.enabled {
|
||||
if srv.gateway.enabled && notSpoke {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
|
||||
}
|
||||
}
|
||||
// Process any qsubs here.
|
||||
for _, esub := range qsubs {
|
||||
srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n))
|
||||
if notSpoke {
|
||||
srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n))
|
||||
}
|
||||
srv.updateLeafNodes(acc, esub.sub, -(esub.n))
|
||||
}
|
||||
if prev := acc.removeClient(c); prev == 1 {
|
||||
|
||||
@@ -1651,6 +1651,7 @@ func (c *client) processLeafUnsub(arg []byte) error {
|
||||
}
|
||||
|
||||
updateGWs := false
|
||||
spoke := c.isSpokeLeafNode()
|
||||
// We store local subs by account and subject and optionally queue name.
|
||||
// LS- will have the arg exactly as the key.
|
||||
sub, ok := c.subs[string(arg)]
|
||||
@@ -1661,11 +1662,13 @@ func (c *client) processLeafUnsub(arg []byte) error {
|
||||
updateGWs = srv.gateway.enabled
|
||||
}
|
||||
|
||||
// If we are routing subtract from the route map for the associated account.
|
||||
srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
// Gateways
|
||||
if updateGWs {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
|
||||
if !spoke {
|
||||
// If we are routing subtract from the route map for the associated account.
|
||||
srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
// Gateways
|
||||
if updateGWs {
|
||||
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
|
||||
}
|
||||
}
|
||||
// Now check on leafnode updates for other leaf nodes.
|
||||
srv.updateLeafNodes(acc, sub, -1)
|
||||
|
||||
Reference in New Issue
Block a user