mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Add in more extensive test on extending hubs with leafnode clusters
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -5518,6 +5518,32 @@ func TestJetStreamClusterMixedMode(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestJetStreamClusterLeafnodeSpokes(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "HUB", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
lnc1 := c.createLeafNodesWithStartPort("R1", 3, 22110)
|
||||
defer lnc1.shutdown()
|
||||
|
||||
lnc2 := c.createLeafNodesWithStartPort("R2", 3, 22120)
|
||||
defer lnc2.shutdown()
|
||||
|
||||
lnc3 := c.createLeafNodesWithStartPort("R3", 3, 22130)
|
||||
defer lnc3.shutdown()
|
||||
|
||||
// Wait on all peers.
|
||||
c.waitOnPeerCount(12)
|
||||
|
||||
// Make sure shrinking works.
|
||||
lnc3.shutdown()
|
||||
c.waitOnPeerCount(9)
|
||||
|
||||
lnc3 = c.createLeafNodesWithStartPort("LNC3", 3, 22130)
|
||||
defer lnc3.shutdown()
|
||||
|
||||
c.waitOnPeerCount(12)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterSuperClusterAndLeafNodesWithSharedSystemAccount(t *testing.T) {
|
||||
sc := createJetStreamSuperCluster(t, 3, 2)
|
||||
defer sc.shutdown()
|
||||
@@ -5969,51 +5995,10 @@ func createJetStreamSuperCluster(t *testing.T, numServersPer, numClusters int) *
|
||||
return sc
|
||||
}
|
||||
|
||||
var jsClusterTemplWithLeafNode = `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
|
||||
|
||||
{{leaf}}
|
||||
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
routes = [%s]
|
||||
}
|
||||
|
||||
# For access to system account.
|
||||
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
||||
`
|
||||
|
||||
var jsLeafFrag = `
|
||||
leaf {
|
||||
remotes [
|
||||
{ urls: [ %s ], deny_exports: ["$JS.API.>"] }
|
||||
{ urls: [ %s ], account: "$SYS" }
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *cluster {
|
||||
// Create our leafnode cluster template first.
|
||||
c := sc.randomCluster()
|
||||
var lns, lnss []string
|
||||
for _, s := range c.servers {
|
||||
ln := s.getOpts().LeafNode
|
||||
lns = append(lns, fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port))
|
||||
lnss = append(lnss, fmt.Sprintf("nats://admin:s3cr3t!@%s:%d", ln.Host, ln.Port))
|
||||
}
|
||||
lnc := strings.Join(lns, ", ")
|
||||
lnsc := strings.Join(lnss, ", ")
|
||||
lconf := fmt.Sprintf(jsLeafFrag, lnc, lnsc)
|
||||
tmpl := strings.Replace(jsClusterTemplWithLeafNode, "{{leaf}}", lconf, 1)
|
||||
|
||||
lc := createJetStreamCluster(sc.t, tmpl, clusterName, numServers, false)
|
||||
for _, s := range lc.servers {
|
||||
checkLeafNodeConnectedCount(sc.t, s, 2)
|
||||
}
|
||||
return lc
|
||||
return c.createLeafNodes(clusterName, numServers)
|
||||
}
|
||||
|
||||
func (sc *supercluster) leader() *Server {
|
||||
@@ -6194,19 +6179,19 @@ func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers
|
||||
}
|
||||
|
||||
func createJetStreamClusterWithTemplate(t *testing.T, tmpl string, clusterName string, numServers int) *cluster {
|
||||
return createJetStreamCluster(t, tmpl, clusterName, numServers, true)
|
||||
const startClusterPort = 22332
|
||||
return createJetStreamCluster(t, tmpl, clusterName, _EMPTY_, numServers, startClusterPort, true)
|
||||
}
|
||||
|
||||
func createJetStreamCluster(t *testing.T, tmpl string, clusterName string, numServers int, waitOnReady bool) *cluster {
|
||||
func createJetStreamCluster(t *testing.T, tmpl string, clusterName string, snPre string, numServers int, portStart int, waitOnReady bool) *cluster {
|
||||
t.Helper()
|
||||
if clusterName == "" || numServers < 1 {
|
||||
t.Fatalf("Bad params")
|
||||
}
|
||||
const startClusterPort = 22332
|
||||
|
||||
// Build out the routes that will be shared with all configs.
|
||||
var routes []string
|
||||
for cp := startClusterPort; cp < startClusterPort+numServers; cp++ {
|
||||
for cp := portStart; cp < portStart+numServers; cp++ {
|
||||
routes = append(routes, fmt.Sprintf("nats-route://127.0.0.1:%d", cp))
|
||||
}
|
||||
routeConfig := strings.Join(routes, ",")
|
||||
@@ -6214,9 +6199,9 @@ func createJetStreamCluster(t *testing.T, tmpl string, clusterName string, numSe
|
||||
// Go ahead and build configurations and start servers.
|
||||
c := &cluster{servers: make([]*Server, 0, numServers), opts: make([]*Options, 0, numServers), name: clusterName}
|
||||
|
||||
for cp := startClusterPort; cp < startClusterPort+numServers; cp++ {
|
||||
for cp := portStart; cp < portStart+numServers; cp++ {
|
||||
storeDir := createDir(t, JetStreamStoreDir)
|
||||
sn := fmt.Sprintf("S-%d", cp-startClusterPort+1)
|
||||
sn := fmt.Sprintf("%sS-%d", snPre, cp-portStart+1)
|
||||
conf := fmt.Sprintf(tmpl, sn, storeDir, clusterName, cp, routeConfig)
|
||||
s, o := RunServerWithConfig(createConfFile(t, []byte(conf)))
|
||||
c.servers = append(c.servers, s)
|
||||
@@ -6246,6 +6231,59 @@ func (c *cluster) addInNewServer() *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
var jsClusterTemplWithLeafNode = `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
|
||||
|
||||
{{leaf}}
|
||||
|
||||
cluster {
|
||||
name: %s
|
||||
listen: 127.0.0.1:%d
|
||||
routes = [%s]
|
||||
}
|
||||
|
||||
# For access to system account.
|
||||
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
||||
`
|
||||
|
||||
var jsLeafFrag = `
|
||||
leaf {
|
||||
remotes [
|
||||
{ urls: [ %s ], deny_exports: ["$JS.API.>"] }
|
||||
{ urls: [ %s ], account: "$SYS" }
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
func (c *cluster) createLeafNodes(clusterName string, numServers int) *cluster {
|
||||
const startClusterPort = 22111
|
||||
return c.createLeafNodesWithStartPort(clusterName, numServers, startClusterPort)
|
||||
}
|
||||
|
||||
func (c *cluster) createLeafNodesWithStartPort(clusterName string, numServers int, portStart int) *cluster {
|
||||
|
||||
// Create our leafnode cluster template first.
|
||||
var lns, lnss []string
|
||||
for _, s := range c.servers {
|
||||
ln := s.getOpts().LeafNode
|
||||
lns = append(lns, fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port))
|
||||
lnss = append(lnss, fmt.Sprintf("nats://admin:s3cr3t!@%s:%d", ln.Host, ln.Port))
|
||||
}
|
||||
lnc := strings.Join(lns, ", ")
|
||||
lnsc := strings.Join(lnss, ", ")
|
||||
lconf := fmt.Sprintf(jsLeafFrag, lnc, lnsc)
|
||||
tmpl := strings.Replace(jsClusterTemplWithLeafNode, "{{leaf}}", lconf, 1)
|
||||
|
||||
pre := clusterName + "-"
|
||||
lc := createJetStreamCluster(c.t, tmpl, clusterName, pre, numServers, portStart, false)
|
||||
for _, s := range lc.servers {
|
||||
checkLeafNodeConnectedCount(c.t, s, 2)
|
||||
}
|
||||
return lc
|
||||
}
|
||||
|
||||
// Adjust limits for the given account.
|
||||
func (c *cluster) updateLimits(account string, newLimits *JetStreamAccountLimits) {
|
||||
c.t.Helper()
|
||||
|
||||
Reference in New Issue
Block a user