diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index a606ba1e..d5c86f20 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5518,6 +5518,32 @@ func TestJetStreamClusterMixedMode(t *testing.T) { }) } +func TestJetStreamClusterLeafnodeSpokes(t *testing.T) { + c := createJetStreamClusterExplicit(t, "HUB", 3) + defer c.shutdown() + + lnc1 := c.createLeafNodesWithStartPort("R1", 3, 22110) + defer lnc1.shutdown() + + lnc2 := c.createLeafNodesWithStartPort("R2", 3, 22120) + defer lnc2.shutdown() + + lnc3 := c.createLeafNodesWithStartPort("R3", 3, 22130) + defer lnc3.shutdown() + + // Wait on all peers. + c.waitOnPeerCount(12) + + // Make sure shrinking works. + lnc3.shutdown() + c.waitOnPeerCount(9) + + lnc3 = c.createLeafNodesWithStartPort("LNC3", 3, 22130) + defer lnc3.shutdown() + + c.waitOnPeerCount(12) +} + func TestJetStreamClusterSuperClusterAndLeafNodesWithSharedSystemAccount(t *testing.T) { sc := createJetStreamSuperCluster(t, 3, 2) defer sc.shutdown() @@ -5969,51 +5995,10 @@ func createJetStreamSuperCluster(t *testing.T, numServersPer, numClusters int) * return sc } -var jsClusterTemplWithLeafNode = ` - listen: 127.0.0.1:-1 - server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} - - {{leaf}} - - cluster { - name: %s - listen: 127.0.0.1:%d - routes = [%s] - } - - # For access to system account. - accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } -` - -var jsLeafFrag = ` - leaf { - remotes [ - { urls: [ %s ], deny_exports: ["$JS.API.>"] } - { urls: [ %s ], account: "$SYS" } - ] - } -` - func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *cluster { // Create our leafnode cluster template first. c := sc.randomCluster() - var lns, lnss []string - for _, s := range c.servers { - ln := s.getOpts().LeafNode - lns = append(lns, fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port)) - lnss = append(lnss, fmt.Sprintf("nats://admin:s3cr3t!@%s:%d", ln.Host, ln.Port)) - } - lnc := strings.Join(lns, ", ") - lnsc := strings.Join(lnss, ", ") - lconf := fmt.Sprintf(jsLeafFrag, lnc, lnsc) - tmpl := strings.Replace(jsClusterTemplWithLeafNode, "{{leaf}}", lconf, 1) - - lc := createJetStreamCluster(sc.t, tmpl, clusterName, numServers, false) - for _, s := range lc.servers { - checkLeafNodeConnectedCount(sc.t, s, 2) - } - return lc + return c.createLeafNodes(clusterName, numServers) } func (sc *supercluster) leader() *Server { @@ -6194,19 +6179,19 @@ func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers } func createJetStreamClusterWithTemplate(t *testing.T, tmpl string, clusterName string, numServers int) *cluster { - return createJetStreamCluster(t, tmpl, clusterName, numServers, true) + const startClusterPort = 22332 + return createJetStreamCluster(t, tmpl, clusterName, _EMPTY_, numServers, startClusterPort, true) } -func createJetStreamCluster(t *testing.T, tmpl string, clusterName string, numServers int, waitOnReady bool) *cluster { +func createJetStreamCluster(t *testing.T, tmpl string, clusterName string, snPre string, numServers int, portStart int, waitOnReady bool) *cluster { t.Helper() if clusterName == "" || numServers < 1 { t.Fatalf("Bad params") } - const startClusterPort = 22332 // Build out the routes that will be shared with all configs. var routes []string - for cp := startClusterPort; cp < startClusterPort+numServers; cp++ { + for cp := portStart; cp < portStart+numServers; cp++ { routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp)) } routeConfig := strings.Join(routes, ",") @@ -6214,9 +6199,9 @@ func createJetStreamCluster(t *testing.T, tmpl string, clusterName string, numSe // Go ahead and build configurations and start servers. c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName} - for cp := startClusterPort; cp < startClusterPort+numServers; cp++ { + for cp := portStart; cp < portStart+numServers; cp++ { storeDir := createDir(t, JetStreamStoreDir) - sn := fmt.Sprintf("S-%d", cp-startClusterPort+1) + sn := fmt.Sprintf("%sS-%d", snPre, cp-portStart+1) conf := fmt.Sprintf(tmpl, sn, storeDir, clusterName, cp, routeConfig) s, o := RunServerWithConfig(createConfFile(t, []byte(conf))) c.servers = append(c.servers, s) @@ -6246,6 +6231,59 @@ func (c *cluster) addInNewServer() *Server { return s } +var jsClusterTemplWithLeafNode = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"} + + {{leaf}} + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + # For access to system account. + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } +` + +var jsLeafFrag = ` + leaf { + remotes [ + { urls: [ %s ], deny_exports: ["$JS.API.>"] } + { urls: [ %s ], account: "$SYS" } + ] + } +` + +func (c *cluster) createLeafNodes(clusterName string, numServers int) *cluster { + const startClusterPort = 22111 + return c.createLeafNodesWithStartPort(clusterName, numServers, startClusterPort) +} + +func (c *cluster) createLeafNodesWithStartPort(clusterName string, numServers int, portStart int) *cluster { + + // Create our leafnode cluster template first. + var lns, lnss []string + for _, s := range c.servers { + ln := s.getOpts().LeafNode + lns = append(lns, fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port)) + lnss = append(lnss, fmt.Sprintf("nats://admin:s3cr3t!@%s:%d", ln.Host, ln.Port)) + } + lnc := strings.Join(lns, ", ") + lnsc := strings.Join(lnss, ", ") + lconf := fmt.Sprintf(jsLeafFrag, lnc, lnsc) + tmpl := strings.Replace(jsClusterTemplWithLeafNode, "{{leaf}}", lconf, 1) + + pre := clusterName + "-" + lc := createJetStreamCluster(c.t, tmpl, clusterName, pre, numServers, portStart, false) + for _, s := range lc.servers { + checkLeafNodeConnectedCount(c.t, s, 2) + } + return lc +} + // Adjust limits for the given account. func (c *cluster) updateLimits(account string, newLimits *JetStreamAccountLimits) { c.t.Helper()