diff --git a/server/accounts.go b/server/accounts.go index 7f397bc7..c739d0ea 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1,4 +1,4 @@ -// Copyright 2018-2022 The NATS Authors +// Copyright 2018-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -74,6 +74,7 @@ type Account struct { usersRevoked map[string]int64 mappings []*mapping lleafs []*client + leafClusters map[string]uint64 imports importMap exports exportMap js *jsAccount @@ -921,6 +922,29 @@ func (a *Account) addClient(c *client) int { return n } +// For registering clusters for remote leafnodes. +// We only register as the hub. +func (a *Account) registerLeafNodeCluster(cluster string) { + a.mu.Lock() + defer a.mu.Unlock() + if a.leafClusters == nil { + a.leafClusters = make(map[string]uint64) + } + a.leafClusters[cluster]++ +} + +// Check to see if this cluster is isolated, meaning the only one. +// Read Lock should be held. +func (a *Account) isLeafNodeClusterIsolated(cluster string) bool { + if cluster == _EMPTY_ { + return false + } + if len(a.leafClusters) > 1 { + return false + } + return a.leafClusters[cluster] > 0 +} + // Helper function to remove leaf nodes. If number of leafnodes gets large // this may need to be optimized out of linear search but believe number // of active leafnodes per account scope to be small and therefore cache friendly. @@ -935,6 +959,15 @@ func (a *Account) removeLeafNode(c *client) { } else { a.lleafs = a.lleafs[:ll-1] } + // Do cluster accounting if we are a hub. + if l.isHubLeafNode() { + cluster := l.remoteCluster() + if count := a.leafClusters[cluster]; count > 1 { + a.leafClusters[cluster]-- + } else if count == 1 { + delete(a.leafClusters, cluster) + } + } return } } diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 2dd0612e..505b8293 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -115,6 +115,13 @@ var jsClusterAccountsTempl = ` routes = [%s] } + websocket { + listen: 127.0.0.1:-1 + compression: true + handshake_timeout: "5s" + no_tls: true + } + no_auth_user: one accounts { @@ -904,6 +911,18 @@ var jsClusterTemplWithSingleLeafNode = ` accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } ` +var jsClusterTemplWithSingleFleetLeafNode = ` + listen: 127.0.0.1:-1 + server_name: %s + cluster: { name: fleet } + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + {{leaf}} + + # For access to system account. + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } +` + var jsClusterTemplWithSingleLeafNodeNoJS = ` listen: 127.0.0.1:-1 server_name: %s @@ -972,8 +991,12 @@ func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server { } func (c *cluster) createLeafNodeWithTemplateNoSystem(name, template string) *Server { + return c.createLeafNodeWithTemplateNoSystemWithProto(name, template, "nats") +} + +func (c *cluster) createLeafNodeWithTemplateNoSystemWithProto(name, template, proto string) *Server { c.t.Helper() - tmpl := c.createLeafSolicitNoSystem(template) + tmpl := c.createLeafSolicitNoSystemWithProto(template, proto) conf := fmt.Sprintf(tmpl, name, c.t.TempDir()) s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf))) c.servers = append(c.servers, s) @@ -983,6 +1006,10 @@ func (c *cluster) createLeafNodeWithTemplateNoSystem(name, template string) *Ser // Helper to generate the leaf solicit configs. func (c *cluster) createLeafSolicit(tmpl string) string { + return c.createLeafSolicitWithProto(tmpl, "nats") +} + +func (c *cluster) createLeafSolicitWithProto(tmpl, proto string) string { c.t.Helper() // Create our leafnode cluster template first. @@ -992,8 +1019,8 @@ func (c *cluster) createLeafSolicit(tmpl string) string { continue } ln := s.getOpts().LeafNode - lns = append(lns, fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port)) - lnss = append(lnss, fmt.Sprintf("nats://admin:s3cr3t!@%s:%d", ln.Host, ln.Port)) + lns = append(lns, fmt.Sprintf("%s://%s:%d", proto, ln.Host, ln.Port)) + lnss = append(lnss, fmt.Sprintf("%s://admin:s3cr3t!@%s:%d", proto, ln.Host, ln.Port)) } lnc := strings.Join(lns, ", ") lnsc := strings.Join(lnss, ", ") @@ -1001,19 +1028,26 @@ func (c *cluster) createLeafSolicit(tmpl string) string { return strings.Replace(tmpl, "{{leaf}}", lconf, 1) } -func (c *cluster) createLeafSolicitNoSystem(tmpl string) string { +func (c *cluster) createLeafSolicitNoSystemWithProto(tmpl, proto string) string { c.t.Helper() // Create our leafnode cluster template first. - var lns string + var lns []string for _, s := range c.servers { if s.ClusterName() != c.name { continue } - ln := s.getOpts().LeafNode - lns = fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port) + switch proto { + case "nats", "tls": + ln := s.getOpts().LeafNode + lns = append(lns, fmt.Sprintf("%s://%s:%d", proto, ln.Host, ln.Port)) + case "ws", "wss": + ln := s.getOpts().Websocket + lns = append(lns, fmt.Sprintf("%s://%s:%d", proto, ln.Host, ln.Port)) + } } - return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(jsLeafNoSysFrag, lns), 1) + lnc := strings.Join(lns, ", ") + return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(jsLeafNoSysFrag, lnc), 1) } func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName string, numJsServers, numNonServers int, doJSConfig bool) *cluster { diff --git a/server/leafnode.go b/server/leafnode.go index 6be7657e..24dcde9a 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1,4 +1,4 @@ -// Copyright 2019-2022 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -39,29 +39,31 @@ import ( "github.com/nats-io/nuid" ) -// Warning when user configures leafnode TLS insecure -const leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!" +const ( + // Warning when user configures leafnode TLS insecure + leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!" -// When a loop is detected, delay the reconnect of solicited connection. -const leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second + // When a loop is detected, delay the reconnect of solicited connection. + leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second -// When a server receives a message causing a permission violation, the -// connection is closed and it won't attempt to reconnect for that long. -const leafNodeReconnectAfterPermViolation = 30 * time.Second + // When a server receives a message causing a permission violation, the + // connection is closed and it won't attempt to reconnect for that long. + leafNodeReconnectAfterPermViolation = 30 * time.Second -// When we have the same cluster name as the hub. -const leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second + // When we have the same cluster name as the hub. + leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second -// Prefix for loop detection subject -const leafNodeLoopDetectionSubjectPrefix = "$LDS." + // Prefix for loop detection subject + leafNodeLoopDetectionSubjectPrefix = "$LDS." -// Path added to URL to indicate to WS server that the connection is a -// LEAF connection as opposed to a CLIENT. -const leafNodeWSPath = "/leafnode" + // Path added to URL to indicate to WS server that the connection is a + // LEAF connection as opposed to a CLIENT. + leafNodeWSPath = "/leafnode" -// This is the time the server will wait, when receiving a CONNECT, -// before closing the connection if the required minimum version is not met. -const leafNodeWaitBeforeClose = 5 * time.Second + // This is the time the server will wait, when receiving a CONNECT, + // before closing the connection if the required minimum version is not met. + leafNodeWaitBeforeClose = 5 * time.Second +) type leaf struct { // We have any auth stuff here for solicited connections. @@ -1579,6 +1581,11 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro c.mu.Unlock() + // Register the cluster, even if empty, as long as we are acting as a hub. + if !proto.Hub { + c.acc.registerLeafNodeCluster(proto.Cluster) + } + // Add in the leafnode here since we passed through auth at this point. s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true) @@ -1793,32 +1800,41 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) { return } - _l := [32]*client{} - leafs := _l[:0] + // Is this a loop detection subject. + isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix)) - // Grab all leaf nodes. Ignore a leafnode if sub's client is a leafnode and matches. - acc.mu.RLock() - for _, ln := range acc.lleafs { - if ln != sub.client { - leafs = append(leafs, ln) - } + // Capture the cluster even if its empty. + cluster := _EMPTY_ + if sub.origin != nil { + cluster = string(sub.origin) } + + acc.mu.RLock() + // If we have an isolated cluster we can return early, as long as it is not a loop detection subject. + // Empty clusters will return false for the check. + if !isLDS && acc.isLeafNodeClusterIsolated(cluster) { + acc.mu.RUnlock() + return + } + // Grab all leaf nodes. + const numStackClients = 64 + var _l [numStackClients]*client + leafs := append(_l[:0], acc.lleafs...) acc.mu.RUnlock() for _, ln := range leafs { - // Check to make sure this sub does not have an origin cluster than matches the leafnode. - ln.mu.Lock() - skip := (sub.origin != nil && string(sub.origin) == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject)) - // If skipped, make sure that we still let go the "$LDS." subscription that allows - // the detection of a loop. - if skip && bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix)) { - skip = false - } - ln.mu.Unlock() - if skip { + if ln == sub.client { continue } - ln.updateSmap(sub, delta) + // Check to make sure this sub does not have an origin cluster that matches the leafnode. + ln.mu.Lock() + skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject)) + ln.mu.Unlock() + // If skipped, make sure that we still let go the "$LDS." subscription that allows + // the detection of a loop. + if isLDS || !skip { + ln.updateSmap(sub, delta) + } } } diff --git a/server/norace_test.go b/server/norace_test.go index 74b2d834..041738ea 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -7810,3 +7810,57 @@ func TestNoRaceParallelStreamAndConsumerCreation(t *testing.T) { t.Fatalf("Expected only one consumer to be really created, got %d out of %d attempts", numConsumers, np) } } + +func TestNoRaceJetStreamClusterLeafnodeConnectPerf(t *testing.T) { + // Uncomment to run. Needs to be on a big machine. Do not want as part of Travis tests atm. + skip(t) + + tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: cloud, store_dir:", 1) + c := createJetStreamCluster(t, tmpl, "CLOUD", _EMPTY_, 3, 18033, true) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "STATE", + Subjects: []string{"STATE.GLOBAL.CELL1.*.>"}, + Replicas: 3, + }) + require_NoError(t, err) + + tmpl = strings.Replace(jsClusterTemplWithSingleFleetLeafNode, "store_dir:", "domain: vehicle, store_dir:", 1) + + var vinSerial int + genVIN := func() string { + vinSerial++ + return fmt.Sprintf("7PDSGAALXNN%06d", vinSerial) + } + + numVehicles := 500 + for i := 0; i < numVehicles; i++ { + start := time.Now() + vin := genVIN() + ln := c.createLeafNodeWithTemplateNoSystemWithProto(vin, tmpl, "ws") + nc, js := jsClientConnect(t, ln) + _, err := js.AddStream(&nats.StreamConfig{ + Name: "VEHICLE", + Subjects: []string{"STATE.GLOBAL.LOCAL.>"}, + Sources: []*nats.StreamSource{{ + Name: "STATE", + FilterSubject: fmt.Sprintf("STATE.GLOBAL.CELL1.%s.>", vin), + External: &nats.ExternalStream{ + APIPrefix: "$JS.cloud.API", + DeliverPrefix: fmt.Sprintf("DELIVER.STATE.GLOBAL.CELL1.%s", vin), + }, + }}, + }) + require_NoError(t, err) + // Create the sourced stream. + checkLeafNodeConnectedCount(t, ln, 1) + if elapsed := time.Since(start); elapsed > 2*time.Second { + t.Fatalf("Took too long to create leafnode %d connection: %v", i+1, elapsed) + } + nc.Close() + } +}