[IMPROVED] Updating of a large fleet of leafnodes. (#4117)

When a fleet of leafnodes are isolated (not routed but using same
cluster) we could do better at optimizing how we update the other
leafnodes since if they are all in the same cluster and we know we are
isolated we can skip.

We can improve further in 2.10.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-30 17:32:14 -07:00
committed by GitHub
4 changed files with 183 additions and 46 deletions

View File

@@ -1,4 +1,4 @@
// Copyright 2018-2022 The NATS Authors
// Copyright 2018-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -74,6 +74,7 @@ type Account struct {
usersRevoked map[string]int64
mappings []*mapping
lleafs []*client
leafClusters map[string]uint64
imports importMap
exports exportMap
js *jsAccount
@@ -921,6 +922,29 @@ func (a *Account) addClient(c *client) int {
return n
}
// For registering clusters for remote leafnodes.
// We only register as the hub.
func (a *Account) registerLeafNodeCluster(cluster string) {
a.mu.Lock()
defer a.mu.Unlock()
if a.leafClusters == nil {
a.leafClusters = make(map[string]uint64)
}
a.leafClusters[cluster]++
}
// Check to see if this cluster is isolated, meaning the only one.
// Read Lock should be held.
func (a *Account) isLeafNodeClusterIsolated(cluster string) bool {
if cluster == _EMPTY_ {
return false
}
if len(a.leafClusters) > 1 {
return false
}
return a.leafClusters[cluster] > 0
}
// Helper function to remove leaf nodes. If number of leafnodes gets large
// this may need to be optimized out of linear search but believe number
// of active leafnodes per account scope to be small and therefore cache friendly.
@@ -935,6 +959,15 @@ func (a *Account) removeLeafNode(c *client) {
} else {
a.lleafs = a.lleafs[:ll-1]
}
// Do cluster accounting if we are a hub.
if l.isHubLeafNode() {
cluster := l.remoteCluster()
if count := a.leafClusters[cluster]; count > 1 {
a.leafClusters[cluster]--
} else if count == 1 {
delete(a.leafClusters, cluster)
}
}
return
}
}

View File

@@ -115,6 +115,13 @@ var jsClusterAccountsTempl = `
routes = [%s]
}
websocket {
listen: 127.0.0.1:-1
compression: true
handshake_timeout: "5s"
no_tls: true
}
no_auth_user: one
accounts {
@@ -904,6 +911,18 @@ var jsClusterTemplWithSingleLeafNode = `
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
var jsClusterTemplWithSingleFleetLeafNode = `
listen: 127.0.0.1:-1
server_name: %s
cluster: { name: fleet }
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
{{leaf}}
# For access to system account.
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
var jsClusterTemplWithSingleLeafNodeNoJS = `
listen: 127.0.0.1:-1
server_name: %s
@@ -972,8 +991,12 @@ func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server {
}
func (c *cluster) createLeafNodeWithTemplateNoSystem(name, template string) *Server {
return c.createLeafNodeWithTemplateNoSystemWithProto(name, template, "nats")
}
func (c *cluster) createLeafNodeWithTemplateNoSystemWithProto(name, template, proto string) *Server {
c.t.Helper()
tmpl := c.createLeafSolicitNoSystem(template)
tmpl := c.createLeafSolicitNoSystemWithProto(template, proto)
conf := fmt.Sprintf(tmpl, name, c.t.TempDir())
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
c.servers = append(c.servers, s)
@@ -983,6 +1006,10 @@ func (c *cluster) createLeafNodeWithTemplateNoSystem(name, template string) *Ser
// Helper to generate the leaf solicit configs.
func (c *cluster) createLeafSolicit(tmpl string) string {
return c.createLeafSolicitWithProto(tmpl, "nats")
}
func (c *cluster) createLeafSolicitWithProto(tmpl, proto string) string {
c.t.Helper()
// Create our leafnode cluster template first.
@@ -992,8 +1019,8 @@ func (c *cluster) createLeafSolicit(tmpl string) string {
continue
}
ln := s.getOpts().LeafNode
lns = append(lns, fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port))
lnss = append(lnss, fmt.Sprintf("nats://admin:s3cr3t!@%s:%d", ln.Host, ln.Port))
lns = append(lns, fmt.Sprintf("%s://%s:%d", proto, ln.Host, ln.Port))
lnss = append(lnss, fmt.Sprintf("%s://admin:s3cr3t!@%s:%d", proto, ln.Host, ln.Port))
}
lnc := strings.Join(lns, ", ")
lnsc := strings.Join(lnss, ", ")
@@ -1001,19 +1028,26 @@ func (c *cluster) createLeafSolicit(tmpl string) string {
return strings.Replace(tmpl, "{{leaf}}", lconf, 1)
}
func (c *cluster) createLeafSolicitNoSystem(tmpl string) string {
func (c *cluster) createLeafSolicitNoSystemWithProto(tmpl, proto string) string {
c.t.Helper()
// Create our leafnode cluster template first.
var lns string
var lns []string
for _, s := range c.servers {
if s.ClusterName() != c.name {
continue
}
ln := s.getOpts().LeafNode
lns = fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port)
switch proto {
case "nats", "tls":
ln := s.getOpts().LeafNode
lns = append(lns, fmt.Sprintf("%s://%s:%d", proto, ln.Host, ln.Port))
case "ws", "wss":
ln := s.getOpts().Websocket
lns = append(lns, fmt.Sprintf("%s://%s:%d", proto, ln.Host, ln.Port))
}
}
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(jsLeafNoSysFrag, lns), 1)
lnc := strings.Join(lns, ", ")
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(jsLeafNoSysFrag, lnc), 1)
}
func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName string, numJsServers, numNonServers int, doJSConfig bool) *cluster {

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -39,29 +39,31 @@ import (
"github.com/nats-io/nuid"
)
// Warning when user configures leafnode TLS insecure
const leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
const (
// Warning when user configures leafnode TLS insecure
leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
// When a loop is detected, delay the reconnect of solicited connection.
const leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
// When a loop is detected, delay the reconnect of solicited connection.
leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
// When a server receives a message causing a permission violation, the
// connection is closed and it won't attempt to reconnect for that long.
const leafNodeReconnectAfterPermViolation = 30 * time.Second
// When a server receives a message causing a permission violation, the
// connection is closed and it won't attempt to reconnect for that long.
leafNodeReconnectAfterPermViolation = 30 * time.Second
// When we have the same cluster name as the hub.
const leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
// When we have the same cluster name as the hub.
leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
// Prefix for loop detection subject
const leafNodeLoopDetectionSubjectPrefix = "$LDS."
// Prefix for loop detection subject
leafNodeLoopDetectionSubjectPrefix = "$LDS."
// Path added to URL to indicate to WS server that the connection is a
// LEAF connection as opposed to a CLIENT.
const leafNodeWSPath = "/leafnode"
// Path added to URL to indicate to WS server that the connection is a
// LEAF connection as opposed to a CLIENT.
leafNodeWSPath = "/leafnode"
// This is the time the server will wait, when receiving a CONNECT,
// before closing the connection if the required minimum version is not met.
const leafNodeWaitBeforeClose = 5 * time.Second
// This is the time the server will wait, when receiving a CONNECT,
// before closing the connection if the required minimum version is not met.
leafNodeWaitBeforeClose = 5 * time.Second
)
type leaf struct {
// We have any auth stuff here for solicited connections.
@@ -1579,6 +1581,11 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
c.mu.Unlock()
// Register the cluster, even if empty, as long as we are acting as a hub.
if !proto.Hub {
c.acc.registerLeafNodeCluster(proto.Cluster)
}
// Add in the leafnode here since we passed through auth at this point.
s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
@@ -1793,32 +1800,41 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
return
}
_l := [32]*client{}
leafs := _l[:0]
// Is this a loop detection subject.
isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
// Grab all leaf nodes. Ignore a leafnode if sub's client is a leafnode and matches.
acc.mu.RLock()
for _, ln := range acc.lleafs {
if ln != sub.client {
leafs = append(leafs, ln)
}
// Capture the cluster even if its empty.
cluster := _EMPTY_
if sub.origin != nil {
cluster = string(sub.origin)
}
acc.mu.RLock()
// If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
// Empty clusters will return false for the check.
if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
acc.mu.RUnlock()
return
}
// Grab all leaf nodes.
const numStackClients = 64
var _l [numStackClients]*client
leafs := append(_l[:0], acc.lleafs...)
acc.mu.RUnlock()
for _, ln := range leafs {
// Check to make sure this sub does not have an origin cluster than matches the leafnode.
ln.mu.Lock()
skip := (sub.origin != nil && string(sub.origin) == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject))
// If skipped, make sure that we still let go the "$LDS." subscription that allows
// the detection of a loop.
if skip && bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix)) {
skip = false
}
ln.mu.Unlock()
if skip {
if ln == sub.client {
continue
}
ln.updateSmap(sub, delta)
// Check to make sure this sub does not have an origin cluster that matches the leafnode.
ln.mu.Lock()
skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject))
ln.mu.Unlock()
// If skipped, make sure that we still let go the "$LDS." subscription that allows
// the detection of a loop.
if isLDS || !skip {
ln.updateSmap(sub, delta)
}
}
}

View File

@@ -7810,3 +7810,57 @@ func TestNoRaceParallelStreamAndConsumerCreation(t *testing.T) {
t.Fatalf("Expected only one consumer to be really created, got %d out of %d attempts", numConsumers, np)
}
}
func TestNoRaceJetStreamClusterLeafnodeConnectPerf(t *testing.T) {
// Uncomment to run. Needs to be on a big machine. Do not want as part of Travis tests atm.
skip(t)
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: cloud, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "CLOUD", _EMPTY_, 3, 18033, true)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "STATE",
Subjects: []string{"STATE.GLOBAL.CELL1.*.>"},
Replicas: 3,
})
require_NoError(t, err)
tmpl = strings.Replace(jsClusterTemplWithSingleFleetLeafNode, "store_dir:", "domain: vehicle, store_dir:", 1)
var vinSerial int
genVIN := func() string {
vinSerial++
return fmt.Sprintf("7PDSGAALXNN%06d", vinSerial)
}
numVehicles := 500
for i := 0; i < numVehicles; i++ {
start := time.Now()
vin := genVIN()
ln := c.createLeafNodeWithTemplateNoSystemWithProto(vin, tmpl, "ws")
nc, js := jsClientConnect(t, ln)
_, err := js.AddStream(&nats.StreamConfig{
Name: "VEHICLE",
Subjects: []string{"STATE.GLOBAL.LOCAL.>"},
Sources: []*nats.StreamSource{{
Name: "STATE",
FilterSubject: fmt.Sprintf("STATE.GLOBAL.CELL1.%s.>", vin),
External: &nats.ExternalStream{
APIPrefix: "$JS.cloud.API",
DeliverPrefix: fmt.Sprintf("DELIVER.STATE.GLOBAL.CELL1.%s", vin),
},
}},
})
require_NoError(t, err)
// Create the sourced stream.
checkLeafNodeConnectedCount(t, ln, 1)
if elapsed := time.Since(start); elapsed > 2*time.Second {
t.Fatalf("Took too long to create leafnode %d connection: %v", i+1, elapsed)
}
nc.Close()
}
}