mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Merge branch 'main' into dev
This commit is contained in:
@@ -1673,11 +1673,11 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
|
||||
return
|
||||
}
|
||||
// Collect all account subs here.
|
||||
_subs := [32]*subscription{}
|
||||
_subs := [1024]*subscription{}
|
||||
subs := _subs[:0]
|
||||
ims := []string{}
|
||||
|
||||
acc.mu.Lock()
|
||||
acc.mu.RLock()
|
||||
accName := acc.Name
|
||||
accNTag := acc.nameTag
|
||||
|
||||
@@ -1716,11 +1716,15 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
|
||||
|
||||
// Create a unique subject that will be used for loop detection.
|
||||
lds := acc.lds
|
||||
acc.mu.RUnlock()
|
||||
|
||||
// Check if we have to create the LDS.
|
||||
if lds == _EMPTY_ {
|
||||
lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
|
||||
acc.mu.Lock()
|
||||
acc.lds = lds
|
||||
acc.mu.Unlock()
|
||||
}
|
||||
acc.mu.Unlock()
|
||||
|
||||
// Now check for gateway interest. Leafnodes will put this into
|
||||
// the proper mode to propagate, but they are not held in the account.
|
||||
@@ -1824,16 +1828,28 @@ func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscrip
|
||||
s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
|
||||
return
|
||||
}
|
||||
s.updateLeafNodes(acc, sub, delta)
|
||||
acc.updateLeafNodes(sub, delta)
|
||||
}
|
||||
|
||||
// updateLeafNodes will make sure to update the smap for the subscription. Will
|
||||
// also forward to all leaf nodes as needed.
|
||||
func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
|
||||
// updateLeafNodes will make sure to update the account smap for the subscription.
|
||||
// Will also forward to all leaf nodes as needed.
|
||||
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
|
||||
if acc == nil || sub == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// We will do checks for no leafnodes and same cluster here inline and under the
|
||||
// general account read lock.
|
||||
// If we feel we need to update the leafnodes we will do that out of line to avoid
|
||||
// blocking routes or GWs.
|
||||
|
||||
acc.mu.RLock()
|
||||
// First check if we even have leafnodes here.
|
||||
if acc.nleafs == 0 {
|
||||
acc.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Is this a loop detection subject.
|
||||
isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
|
||||
|
||||
@@ -1843,43 +1859,45 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
|
||||
cluster = string(sub.origin)
|
||||
}
|
||||
|
||||
acc.mu.RLock()
|
||||
// If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
|
||||
// Empty clusters will return false for the check.
|
||||
if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
|
||||
acc.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
// Grab all leaf nodes.
|
||||
const numStackClients = 64
|
||||
var _l [numStackClients]*client
|
||||
leafs := append(_l[:0], acc.lleafs...)
|
||||
|
||||
// We can release the general account lock.
|
||||
acc.mu.RUnlock()
|
||||
|
||||
for _, ln := range leafs {
|
||||
// We can hold the list lock here to avoid having to copy a large slice.
|
||||
acc.lmu.RLock()
|
||||
defer acc.lmu.RUnlock()
|
||||
|
||||
// Do this once.
|
||||
subject := string(sub.subject)
|
||||
|
||||
// Walk the connected leafnodes.
|
||||
for _, ln := range acc.lleafs {
|
||||
if ln == sub.client {
|
||||
continue
|
||||
}
|
||||
// Check to make sure this sub does not have an origin cluster that matches the leafnode.
|
||||
ln.mu.Lock()
|
||||
skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject))
|
||||
ln.mu.Unlock()
|
||||
skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || (delta > 0 && !ln.canSubscribe(subject))
|
||||
// If skipped, make sure that we still let go the "$LDS." subscription that allows
|
||||
// the detection of a loop.
|
||||
if isLDS || !skip {
|
||||
ln.updateSmap(sub, delta)
|
||||
}
|
||||
ln.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// This will make an update to our internal smap and determine if we should send out
|
||||
// an interest update to the remote side.
|
||||
// Lock should be held.
|
||||
func (c *client) updateSmap(sub *subscription, delta int32) {
|
||||
key := keyFromSub(sub)
|
||||
|
||||
c.mu.Lock()
|
||||
if c.leaf.smap == nil {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1887,7 +1905,6 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
|
||||
skind := sub.client.kind
|
||||
updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
|
||||
if c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1900,12 +1917,16 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
|
||||
c.leaf.tsubt.Stop()
|
||||
c.leaf.tsubt = nil
|
||||
}
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n := c.leaf.smap[key]
|
||||
key := keyFromSub(sub)
|
||||
n, ok := c.leaf.smap[key]
|
||||
if delta < 0 && !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
|
||||
update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
|
||||
n += delta
|
||||
@@ -1917,7 +1938,6 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
|
||||
if update {
|
||||
c.sendLeafNodeSubUpdate(key, n)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Used to force add subjects to the subject map.
|
||||
@@ -2131,7 +2151,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
}
|
||||
// Now check on leafnode updates for other leaf nodes. We understand solicited
|
||||
// and non-solicited state in this call so we will do the right thing.
|
||||
srv.updateLeafNodes(acc, sub, delta)
|
||||
acc.updateLeafNodes(sub, delta)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -2188,7 +2208,7 @@ func (c *client) processLeafUnsub(arg []byte) error {
|
||||
}
|
||||
}
|
||||
// Now check on leafnode updates for other leaf nodes.
|
||||
srv.updateLeafNodes(acc, sub, -1)
|
||||
acc.updateLeafNodes(sub, -1)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user