mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
This adds ability to have a single node server with a system leafnode expand an existing JetStream cluster domain.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -219,7 +219,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
|
||||
}
|
||||
|
||||
// If we are in clustered mode go ahead and start the meta controller.
|
||||
if !s.standAloneMode() {
|
||||
if !s.standAloneMode() || s.hasSolicitLeafNodeSystemShare() {
|
||||
if err := s.enableJetStreamClustering(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -228,6 +228,22 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// This will check if we have the a solicited leafnode that shares the system account.
|
||||
func (s *Server) hasSolicitLeafNodeSystemShare() bool {
|
||||
sysAcc := s.SystemAccount().GetName()
|
||||
for _, r := range s.getOpts().LeafNode.Remotes {
|
||||
if r.LocalAccount == sysAcc {
|
||||
for _, denySub := range r.DenyImports {
|
||||
if subjectIsSubsetMatch(denySub, raftAllSubj) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Server) updateJetStreamInfoStatus(enabled bool) {
|
||||
s.mu.Lock()
|
||||
s.info.JetStream = enabled
|
||||
|
||||
@@ -460,8 +460,13 @@ func (s *Server) enableJetStreamClustering() error {
|
||||
s.Noticef("Starting JetStream cluster")
|
||||
// We need to determine if we have a stable cluster name and expected number of servers.
|
||||
s.Debugf("JetStream cluster checking for stable cluster name and peers")
|
||||
if s.isClusterNameDynamic() || s.configuredRoutes() == 0 {
|
||||
return errors.New("JetStream cluster requires cluster name and explicit routes")
|
||||
|
||||
hasLeafNodeSystemShare := s.hasSolicitLeafNodeSystemShare()
|
||||
if s.isClusterNameDynamic() && !hasLeafNodeSystemShare {
|
||||
return errors.New("JetStream cluster requires cluster name")
|
||||
}
|
||||
if s.configuredRoutes() == 0 && !hasLeafNodeSystemShare {
|
||||
return errors.New("JetStream cluster requires configured routes or solicited leafnode for the system account")
|
||||
}
|
||||
|
||||
return js.setupMetaGroup()
|
||||
@@ -489,22 +494,7 @@ func (js *jetStream) setupMetaGroup() error {
|
||||
// If we are soliciting leafnode connections and we are sharing a system account
|
||||
// we want to move to observer mode so that we extend the solicited cluster or supercluster
|
||||
// but do not form our own.
|
||||
if ln := s.getOpts().LeafNode; len(ln.Remotes) > 0 {
|
||||
sys := s.SystemAccount().GetName()
|
||||
ENDFOR:
|
||||
for _, r := range ln.Remotes {
|
||||
if r.LocalAccount == sys {
|
||||
for _, denySub := range r.DenyImports {
|
||||
if subjectIsSubsetMatch(denySub, raftAllSubj) {
|
||||
// raft group is denied, hence there won't be anything to observe
|
||||
break ENDFOR
|
||||
}
|
||||
}
|
||||
cfg.Observer = true
|
||||
break ENDFOR
|
||||
}
|
||||
}
|
||||
}
|
||||
cfg.Observer = s.hasSolicitLeafNodeSystemShare()
|
||||
|
||||
var bootstrap bool
|
||||
if _, err := readPeerState(storeDir); err != nil {
|
||||
|
||||
@@ -5629,6 +5629,20 @@ func TestJetStreamClusterSuperClusterAndLeafNodesWithSharedSystemAccount(t *test
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterSuperClusterAndSingleLeafNodeWithSharedSystemAccount(t *testing.T) {
|
||||
sc := createJetStreamSuperCluster(t, 3, 2)
|
||||
defer sc.shutdown()
|
||||
|
||||
ln := sc.createSingleLeafNode()
|
||||
defer ln.Shutdown()
|
||||
|
||||
// We want to make sure there is only one leader and its always in the supercluster.
|
||||
sc.waitOnLeader()
|
||||
|
||||
// leafnodes should have been added into the overall peer count.
|
||||
sc.waitOnPeerCount(7)
|
||||
}
|
||||
|
||||
func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) {
|
||||
c := createJetStreamCluster(t, jsClusterAccountsTempl, "HUB", _EMPTY_, 2, 33133, false)
|
||||
defer c.shutdown()
|
||||
@@ -6172,6 +6186,11 @@ func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *clu
|
||||
return c.createLeafNodes(clusterName, numServers)
|
||||
}
|
||||
|
||||
func (sc *supercluster) createSingleLeafNode() *Server {
|
||||
c := sc.randomCluster()
|
||||
return c.createLeafNode()
|
||||
}
|
||||
|
||||
func (sc *supercluster) leader() *Server {
|
||||
for _, c := range sc.clusters {
|
||||
if leader := c.leader(); leader != nil {
|
||||
@@ -6423,6 +6442,17 @@ var jsClusterTemplWithLeafNode = `
|
||||
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
||||
`
|
||||
|
||||
var jsClusterTemplWithSingleLeafNode = `
|
||||
listen: 127.0.0.1:-1
|
||||
server_name: %s
|
||||
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
|
||||
|
||||
{{leaf}}
|
||||
|
||||
# For access to system account.
|
||||
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
|
||||
`
|
||||
|
||||
var jsLeafFrag = `
|
||||
leaf {
|
||||
remotes [
|
||||
@@ -6441,7 +6471,17 @@ func (c *cluster) createLeafNodesWithStartPort(clusterName string, numServers in
|
||||
return c.createLeafNodesWithStartPortAndMQTT(clusterName, numServers, portStart, _EMPTY_)
|
||||
}
|
||||
|
||||
func (c *cluster) createLeafNodesWithStartPortAndMQTT(clusterName string, numServers int, portStart int, mqtt string) *cluster {
|
||||
func (c *cluster) createLeafNode() *Server {
|
||||
tmpl := c.createLeafSolicit(jsClusterTemplWithSingleLeafNode)
|
||||
conf := fmt.Sprintf(tmpl, "LNS", createDir(c.t, JetStreamStoreDir))
|
||||
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
|
||||
c.servers = append(c.servers, s)
|
||||
c.opts = append(c.opts, o)
|
||||
return s
|
||||
}
|
||||
|
||||
// Helper to generate the leaf solicit configs.
|
||||
func (c *cluster) createLeafSolicit(tmpl string) string {
|
||||
// Create our leafnode cluster template first.
|
||||
var lns, lnss []string
|
||||
for _, s := range c.servers {
|
||||
@@ -6452,7 +6492,12 @@ func (c *cluster) createLeafNodesWithStartPortAndMQTT(clusterName string, numSer
|
||||
lnc := strings.Join(lns, ", ")
|
||||
lnsc := strings.Join(lnss, ", ")
|
||||
lconf := fmt.Sprintf(jsLeafFrag, lnc, lnsc)
|
||||
tmpl := strings.Replace(jsClusterTemplWithLeafNode, "{{leaf}}", lconf, 1)
|
||||
return strings.Replace(tmpl, "{{leaf}}", lconf, 1)
|
||||
}
|
||||
|
||||
func (c *cluster) createLeafNodesWithStartPortAndMQTT(clusterName string, numServers int, portStart int, mqtt string) *cluster {
|
||||
// Create our leafnode cluster template first.
|
||||
tmpl := c.createLeafSolicit(jsClusterTemplWithLeafNode)
|
||||
tmpl = strings.Replace(tmpl, "{{mqtt}}", mqtt, 1)
|
||||
|
||||
pre := clusterName + "-"
|
||||
|
||||
@@ -524,7 +524,7 @@ func (s *Server) setClusterName(name string) {
|
||||
|
||||
// Return whether the cluster name is dynamic.
|
||||
func (s *Server) isClusterNameDynamic() bool {
|
||||
return s.getOpts().Cluster.Name == ""
|
||||
return s.getOpts().Cluster.Name == _EMPTY_
|
||||
}
|
||||
|
||||
// ClientURL returns the URL used to connect clients. Helpful in testing
|
||||
|
||||
Reference in New Issue
Block a user