From 3e8b66286d705df59197ff2660666c35c316140a Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Thu, 16 Dec 2021 16:53:20 -0500 Subject: [PATCH] Js leaf deny (#2693) Along a leaf node connection, unless the system account is shared AND the JetStream domain name is identical, the default JetStream traffic (without a domain set) will be denied. As a consequence, all clients that wants to access a domain that is not the one in the server they are connected to, a domain name must be specified. Affected from this change are setups where: a leaf node had no local JetStream OR the server the leaf node connected to had no local JetStream. One of the two accounts that are connected via a leaf node remote, must have no JetStream enabled. The side that does not have JetStream enabled, will loose JetStream access and it's clients must set `nats.Domain` manually. For workarounds on how to restore the old behavior, look at: https://github.com/nats-io/nats-server/pull/2693#issuecomment-996212582 New config values added: `default_js_domain` is a mapping from account to domain, settable when JetStream is not enabled in an account. `extension_hint` are hints for non clustered server to start in clustered mode (and be usable to extend) `js_domain` is a way to set the JetStream domain to use for mqtt. Signed-off-by: Matthias Hanel --- server/auth.go | 26 +- server/client.go | 53 +- server/jetstream.go | 88 ++- server/jetstream_api.go | 2 + server/jetstream_cluster.go | 51 +- server/jetstream_cluster_test.go | 219 +++++-- server/jetstream_test.go | 13 +- server/leafnode.go | 382 ++++++------ server/leafnode_test.go | 973 +++++++++++++++++++++++++++++++ server/mqtt.go | 73 ++- server/mqtt_test.go | 205 ++++--- server/opts.go | 163 +++--- server/raft.go | 37 +- server/reload.go | 2 +- server/server.go | 21 + server/server_test.go | 2 +- server/sublist_test.go | 4 + 17 files changed, 1841 insertions(+), 473 deletions(-) diff --git a/server/auth.go b/server/auth.go index 05d35afa..f2e76692 100644 --- a/server/auth.go +++ b/server/auth.go @@ -671,6 +671,28 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) boo if err := c.RegisterNkeyUser(nkey); err != nil { return false } + + // Warn about JetStream restrictions + if c.perms != nil { + deniedPub := []string{} + deniedSub := []string{} + for _, sub := range denyAllJs { + if c.perms.pub.deny != nil { + if r := c.perms.pub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 { + deniedPub = append(deniedPub, sub) + } + } + if c.perms.sub.deny != nil { + if r := c.perms.sub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 { + deniedSub = append(deniedSub, sub) + } + } + } + if len(deniedPub) > 0 || len(deniedSub) > 0 { + c.Noticef("Connected %s has JetStream denied on pub: %v sub: %v", c.kindString(), deniedPub, deniedSub) + } + } + // Hold onto the user's public key. c.mu.Lock() c.pubKey = juc.Subject @@ -683,8 +705,8 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) boo acc.mu.RLock() c.Debugf("Authenticated JWT: %s %q (claim-name: %q, claim-tags: %q) "+ - "signed with %q by Account %q (claim-name: %q, claim-tags: %q) signed with %q", - c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer) + "signed with %q by Account %q (claim-name: %q, claim-tags: %q) signed with %q has mappings %t accused %p", + c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappings(), acc) acc.mu.RUnlock() return true } diff --git a/server/client.go b/server/client.go index 06098344..1d889fad 100644 --- a/server/client.go +++ b/server/client.go @@ -942,24 +942,65 @@ func (c *client) setPermissions(perms *Permissions) { } } +type denyType int + +const ( + pub = denyType(iota + 1) + sub + both +) + // Merge client.perms structure with additional pub deny permissions // Lock is held on entry. -func (c *client) mergePubDenyPermissions(denyPubs []string) { +func (c *client) mergeDenyPermissions(what denyType, denyPubs []string) { if len(denyPubs) == 0 { return } if c.perms == nil { c.perms = &permissions{} } - if c.perms.pub.deny == nil { - c.perms.pub.deny = NewSublistWithCache() + var perms []*perm + switch what { + case pub: + perms = []*perm{&c.perms.pub} + case sub: + perms = []*perm{&c.perms.sub} + case both: + perms = []*perm{&c.perms.pub, &c.perms.sub} } - for _, pubSubject := range denyPubs { - sub := &subscription{subject: []byte(pubSubject)} - c.perms.pub.deny.Insert(sub) + for _, p := range perms { + if p.deny == nil { + p.deny = NewSublistWithCache() + } + FOR_DENY: + for _, subj := range denyPubs { + r := p.deny.Match(subj) + for _, v := range r.qsubs { + for _, s := range v { + if string(s.subject) == subj { + continue FOR_DENY + } + } + } + for _, s := range r.psubs { + if string(s.subject) == subj { + continue FOR_DENY + } + } + sub := &subscription{subject: []byte(subj)} + p.deny.Insert(sub) + } } } +// Merge client.perms structure with additional pub deny permissions +// Client lock must not be held on entry +func (c *client) mergeDenyPermissionsLocked(what denyType, denyPubs []string) { + c.mu.Lock() + c.mergeDenyPermissions(what, denyPubs) + c.mu.Unlock() +} + // Check to see if we have an expiration for the user JWT via base claims. // FIXME(dlc) - Clear on connect with new JWT. func (c *client) setExpiration(claims *jwt.ClaimsData, validFor time.Duration) { diff --git a/server/jetstream.go b/server/jetstream.go index 6a6f7cfb..6a784681 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -365,8 +365,16 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { return err } + canExtend := s.canExtendOtherDomain() + standAlone := s.standAloneMode() + if standAlone && canExtend && s.getOpts().JetStreamExtHint != jsWillExtend { + canExtend = false + s.Noticef("Standalone server started in clustered mode do not support extending domains") + s.Noticef(`Manually disable standalone mode by setting the JetStream Option "extension_hint: %s"`, jsWillExtend) + } + // If we are in clustered mode go ahead and start the meta controller. - if !s.standAloneMode() || s.wantsToExtendOtherDomain() { + if !standAlone || canExtend { if err := s.enableJetStreamClustering(); err != nil { return err } @@ -375,15 +383,13 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { return nil } +const jsNoExtend = "no_extend" +const jsWillExtend = "will_extend" + // This will check if we have a solicited leafnode that shares the system account -// and we are not our own domain and we are not denying subjects that would prevent -// extending JetStream. -func (s *Server) wantsToExtendOtherDomain() bool { +// and extension is not manually disabled +func (s *Server) canExtendOtherDomain() bool { opts := s.getOpts() - // If we have a domain name set we want to be our own domain. - if opts.JetStreamDomain != _EMPTY_ { - return false - } sysAcc := s.SystemAccount().GetName() for _, r := range opts.LeafNode.Remotes { if r.LocalAccount == sysAcc { @@ -730,20 +736,6 @@ func (s *Server) JetStreamEnabledForDomain() bool { return jsFound } -// Helper to see if we have a non-empty domain defined in any server we know about. -func (s *Server) jetStreamHasDomainConfigured() bool { - var found bool - s.nodeToInfo.Range(func(k, v interface{}) bool { - if v.(nodeInfo).domain != _EMPTY_ { - found = true - return false - } - return true - }) - - return found -} - // Will migrate off ephemerals if possible. // This means parent stream needs to be replicated. func (s *Server) migrateEphemerals() { @@ -2189,6 +2181,51 @@ func (s *Server) resourcesExeededError() { // For validating options. func validateJetStreamOptions(o *Options) error { + // in non operator mode, the account names need to be configured + if len(o.JsAccDefaultDomain) > 0 { + if len(o.TrustedOperators) == 0 { + for a, domain := range o.JsAccDefaultDomain { + found := false + if isReservedAccount(a) { + found = true + } else { + for _, acc := range o.Accounts { + if a == acc.GetName() { + if acc.jsLimits != nil && domain != _EMPTY_ { + return fmt.Errorf("default_js_domain contains account name %q with enabled JetStream", a) + } + found = true + break + } + } + } + if !found { + return fmt.Errorf("in non operator mode, `default_js_domain` references non existing account %q", a) + } + } + } else { + for a := range o.JsAccDefaultDomain { + if !nkeys.IsValidPublicAccountKey(a) { + return fmt.Errorf("default_js_domain contains account name %q, which is not a valid public account nkey", a) + } + } + } + for a, d := range o.JsAccDefaultDomain { + sacc := DEFAULT_SYSTEM_ACCOUNT + if o.SystemAccount != _EMPTY_ { + sacc = o.SystemAccount + } + if a == sacc { + return fmt.Errorf("system account %q can not be in default_js_domain", a) + } + if d == _EMPTY_ { + continue + } + if sub := fmt.Sprintf(jsDomainAPI, d); !IsValidSubject(sub) { + return fmt.Errorf("default_js_domain contains account %q with invalid domain name %q", a, d) + } + } + } if o.JetStreamDomain != _EMPTY_ { if subj := fmt.Sprintf(jsDomainAPI, o.JetStreamDomain); !IsValidSubject(subj) { return fmt.Errorf("invalid domain name: derived %q is not a valid subject", subj) @@ -2209,5 +2246,12 @@ func validateJetStreamOptions(o *Options) error { return fmt.Errorf("jetstream cluster requires `cluster.name` to be set") } + h := strings.ToLower(o.JetStreamExtHint) + switch h { + case jsWillExtend, jsNoExtend, _EMPTY_: + o.JetStreamExtHint = h + default: + return fmt.Errorf("expected 'no_extend' for string value, got '%s'", h) + } return nil } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 3e6471fb..b6d4e4a4 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -256,6 +256,8 @@ const ( JSAuditAdvisory = "$JS.EVENT.ADVISORY.API" ) +var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI} + // JSMaxDescription is the maximum description length for streams and consumers. const JSMaxDescriptionLen = 4 * 1024 diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 77d56622..b97cb6ab 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -468,7 +468,7 @@ func (s *Server) enableJetStreamClustering() error { // We need to determine if we have a stable cluster name and expected number of servers. s.Debugf("JetStream cluster checking for stable cluster name and peers") - hasLeafNodeSystemShare := s.wantsToExtendOtherDomain() + hasLeafNodeSystemShare := s.canExtendOtherDomain() if s.isClusterNameDynamic() && !hasLeafNodeSystemShare { return errors.New("JetStream cluster requires cluster name") } @@ -498,13 +498,12 @@ func (js *jetStream) setupMetaGroup() error { cfg := &RaftConfig{Name: defaultMetaGroupName, Store: storeDir, Log: fs} - // If we are soliciting leafnode connections and we are sharing a system account - // we want to move to observer mode so that we extend the solicited cluster or supercluster - // but do not form our own. - cfg.Observer = s.wantsToExtendOtherDomain() + // If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint, + // we want to move to observer mode so that we extend the solicited cluster or supercluster but do not form our own. + cfg.Observer = s.canExtendOtherDomain() && s.opts.JetStreamExtHint != jsNoExtend var bootstrap bool - if _, err := readPeerState(storeDir); err != nil { + if ps, err := readPeerState(storeDir); err != nil { s.Noticef("JetStream cluster bootstrapping") bootstrap = true peers := s.ActivePeers() @@ -512,8 +511,33 @@ func (js *jetStream) setupMetaGroup() error { if err := s.bootstrapRaftNode(cfg, peers, false); err != nil { return err } + if cfg.Observer { + s.Noticef("Turning JetStream metadata controller Observer Mode on") + } } else { s.Noticef("JetStream cluster recovering state") + // correlate the value of observer with observations from a previous run. + if cfg.Observer { + switch ps.domainExt { + case extExtended: + s.Noticef("Keeping JetStream metadata controller Observer Mode on - due to previous contact") + case extNotExtended: + s.Noticef("Turning JetStream metadata controller Observer Mode off - due to previous contact") + cfg.Observer = false + case extUndetermined: + s.Noticef("Turning JetStream metadata controller Observer Mode on - no previous contact") + s.Noticef("In cases where JetStream will not be extended") + s.Noticef("and waiting for leader election until first contact is not acceptable,") + s.Noticef(`manually disable Observer Mode by setting the JetStream Option "extension_hint: %s"`, jsNoExtend) + } + } else { + // To track possible configuration changes, responsible for an altered value of cfg.Observer, + // set extension state to undetermined. + ps.domainExt = extUndetermined + if err := writePeerState(storeDir, ps); err != nil { + return err + } + } } // Start up our meta node. @@ -829,8 +853,9 @@ func (js *jetStream) monitorCluster() { // If we are here we do not have a leader and we did not have a previous one, so cold start. // Check to see if we can adjust our cluster size down iff we are in mixed mode and we have // seen a total that is what our original estimate was. - if js, total := s.trackedJetStreamServers(); js < total && total >= n.ClusterSize() { - s.Noticef("Adjusting JetStream expected peer set size to %d from original %d", js, n.ClusterSize()) + cs := n.ClusterSize() + if js, total := s.trackedJetStreamServers(); js < total && total >= cs && js != cs { + s.Noticef("Adjusting JetStream expected peer set size to %d from original %d", js, cs) n.AdjustBootClusterSize(js) } } @@ -3442,7 +3467,15 @@ func (js *jetStream) stopUpdatesSub() { func (js *jetStream) processLeaderChange(isLeader bool) { if isLeader { - js.srv.Noticef("JetStream cluster new metadata leader") + js.srv.Noticef("Self is new JetStream cluster metadata leader") + } else if node := js.getMetaGroup().GroupLeader(); node == _EMPTY_ { + js.srv.Noticef("JetStream cluster no metadata leader") + } else if srv := js.srv.serverNameForNode(node); srv == _EMPTY_ { + js.srv.Noticef("JetStream cluster new remote metadata leader") + } else if clst := js.srv.clusterNameForNode(node); clst == _EMPTY_ { + js.srv.Noticef("JetStream cluster new metadata leader: %s", srv) + } else { + js.srv.Noticef("JetStream cluster new metadata leader: %s/%s", srv, clst) } js.mu.Lock() diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index d945d29f..9c072073 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -6154,13 +6154,13 @@ func TestJetStreamClusterLeafnodeSpokes(t *testing.T) { c := createJetStreamCluster(t, jsClusterTempl, "HUB", _EMPTY_, 3, 22020, false) defer c.shutdown() - lnc1 := c.createLeafNodesWithStartPort("R1", 3, 22110) + lnc1 := c.createLeafNodesWithStartPortAndDomain("R1", 3, 22110, _EMPTY_) defer lnc1.shutdown() - lnc2 := c.createLeafNodesWithStartPort("R2", 3, 22120) + lnc2 := c.createLeafNodesWithStartPortAndDomain("R2", 3, 22120, _EMPTY_) defer lnc2.shutdown() - lnc3 := c.createLeafNodesWithStartPort("R3", 3, 22130) + lnc3 := c.createLeafNodesWithStartPortAndDomain("R3", 3, 22130, _EMPTY_) defer lnc3.shutdown() // Wait on all peers. @@ -6170,13 +6170,13 @@ func TestJetStreamClusterLeafnodeSpokes(t *testing.T) { lnc3.shutdown() c.waitOnPeerCount(9) - lnc3 = c.createLeafNodesWithStartPort("LNC3", 3, 22130) + lnc3 = c.createLeafNodesWithStartPortAndDomain("LNC3", 3, 22130, _EMPTY_) defer lnc3.shutdown() c.waitOnPeerCount(12) } -func TestJetStreamClusterSuperClusterAndLeafNodesWithSharedSystemAccount(t *testing.T) { +func TestJetStreamClusterSuperClusterAndLeafNodesWithSharedSystemAccountAndSameDomain(t *testing.T) { sc := createJetStreamSuperCluster(t, 3, 2) defer sc.shutdown() @@ -6206,29 +6206,20 @@ func TestJetStreamClusterSuperClusterAndLeafNodesWithSharedSystemAccount(t *test for _, ln := range ls.leafs { ln.mu.Lock() if ln.leaf.remote.RemoteLeafOpts.LocalAccount == gacc { - // Make sure we have the $JS.API denied in both. - for _, dsubj := range ln.leaf.remote.RemoteLeafOpts.DenyExports { - if dsubj == jsAllAPI { - hasDE = true - break - } - } - for _, dsubj := range ln.leaf.remote.RemoteLeafOpts.DenyImports { - if dsubj == jsAllAPI { - hasDI = true - break - } - } + re := ln.perms.pub.deny.Match(jsAllAPI) + hasDE = len(re.psubs)+len(re.qsubs) > 0 + rs := ln.perms.sub.deny.Match(jsAllAPI) + hasDI = len(rs.psubs)+len(rs.qsubs) > 0 } ln.mu.Unlock() } ls.mu.Unlock() if !hasDE { - t.Fatalf("No deny export on system account") + t.Fatalf("No deny export on global account") } if !hasDI { - t.Fatalf("No deny import on system account") + t.Fatalf("No deny import on global account") } // Make a stream by connecting to the leafnode cluster. Make sure placement is correct. @@ -6264,11 +6255,85 @@ func TestJetStreamClusterSuperClusterAndLeafNodesWithSharedSystemAccount(t *test } } +func TestJetStreamClusterSuperClusterAndLeafNodesWithSharedSystemAccountAndDifferentDomain(t *testing.T) { + sc := createJetStreamSuperCluster(t, 3, 2) + defer sc.shutdown() + + lnc := sc.createLeafNodesWithDomain("LNC", 2, "LEAFDOMAIN") + defer lnc.shutdown() + + // We want to make sure there is only one leader and its always in the supercluster. + sc.waitOnLeader() + lnc.waitOnLeader() + + // even though system account is shared, because domains differ, + sc.waitOnPeerCount(6) + lnc.waitOnPeerCount(2) + + // Check here that we auto detect sharing system account as well and auto place the correct + // deny imports and exports. + ls := lnc.randomServer() + if ls == nil { + t.Fatalf("Expected a leafnode server, got none") + } + gacc := ls.globalAccount().GetName() + + ls.mu.Lock() + var hasDE, hasDI bool + for _, ln := range ls.leafs { + ln.mu.Lock() + if ln.leaf.remote.RemoteLeafOpts.LocalAccount == gacc { + re := ln.perms.pub.deny.Match(jsAllAPI) + hasDE = len(re.psubs)+len(re.qsubs) > 0 + rs := ln.perms.sub.deny.Match(jsAllAPI) + hasDI = len(rs.psubs)+len(rs.qsubs) > 0 + } + ln.mu.Unlock() + } + ls.mu.Unlock() + + if !hasDE { + t.Fatalf("No deny export on global account") + } + if !hasDI { + t.Fatalf("No deny import on global account") + } + + // Make a stream by connecting to the leafnode cluster. Make sure placement is correct. + // Client based API + nc, js := jsClientConnect(t, lnc.randomServer()) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar"}, + Replicas: 2, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.Cluster.Name != "LNC" { + t.Fatalf("Expected default placement to be %q, got %q", "LNC", si.Cluster.Name) + } + + // Now make sure placement does not works for cluster in different domain + pcn := "C2" + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST2", + Subjects: []string{"baz"}, + Replicas: 2, + Placement: &nats.Placement{Cluster: pcn}, + }) + if err == nil || !strings.Contains(err.Error(), "insufficient resources") { + t.Fatalf("Expected insufficient resources, got: %v", err) + } +} + func TestJetStreamClusterSuperClusterAndSingleLeafNodeWithSharedSystemAccount(t *testing.T) { sc := createJetStreamSuperCluster(t, 3, 2) defer sc.shutdown() - ln := sc.createSingleLeafNode() + ln := sc.createSingleLeafNode(true) defer ln.Shutdown() // We want to make sure there is only one leader and its always in the supercluster. @@ -6341,7 +6406,7 @@ func TestJetStreamClusterLeafNodeDenyNoDupe(t *testing.T) { // Grab the correct remote. for _, remote := range vz.LeafNode.Remotes { if remote.LocalAccount == ln.SystemAccount().Name { - if len(remote.Deny.Exports) > 3 { // denyAll := []string{jscAllSubj, raftAllSubj, jsAllAPI} + if remote.Deny != nil && len(remote.Deny.Exports) > 3 { // denyAll := []string{jscAllSubj, raftAllSubj, jsAllAPI} t.Fatalf("Dupe entries found: %+v", remote.Deny) } break @@ -6351,7 +6416,7 @@ func TestJetStreamClusterLeafNodeDenyNoDupe(t *testing.T) { // Multiple JS domains. func TestJetStreamClusterSingleLeafNodeWithoutSharedSystemAccount(t *testing.T) { - c := createJetStreamCluster(t, jsClusterAccountsTempl, "HUB", _EMPTY_, 3, 14333, true) + c := createJetStreamCluster(t, strings.Replace(jsClusterAccountsTempl, "store_dir", "domain: CORE, store_dir", 1), "HUB", _EMPTY_, 3, 14333, true) defer c.shutdown() ln := c.createSingleLeafNodeNoSystemAccount() @@ -6366,7 +6431,7 @@ func TestJetStreamClusterSingleLeafNodeWithoutSharedSystemAccount(t *testing.T) // Check behavior of the account without JS. // Normally this should fail since our local account is not enabled. However, since we are bridging // via the leafnode we expect this to work here. - nc, js := jsClientConnect(t, ln, nats.UserInfo("n", "p")) + nc, js := jsClientConnectEx(t, ln, "CORE", nats.UserInfo("n", "p")) defer nc.Close() si, err := js.AddStream(&nats.StreamConfig{ @@ -6546,14 +6611,14 @@ func TestJetStreamClusterSingleLeafNodeWithoutSharedSystemAccount(t *testing.T) // JetStream Domains func TestJetStreamClusterDomains(t *testing.T) { // This adds in domain config option to template. - // jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: CORE, store_dir: "%s"} tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: CORE, store_dir:", 1) c := createJetStreamCluster(t, tmpl, "CORE", _EMPTY_, 3, 12232, true) defer c.shutdown() // This leafnode is a single server with no domain but sharing the system account. // This extends the CORE domain through this leafnode. - ln := c.createLeafNodeWithTemplate("LN-SYS", jsClusterTemplWithSingleLeafNode) + ln := c.createLeafNodeWithTemplate("LN-SYS", + strings.ReplaceAll(jsClusterTemplWithSingleLeafNode, "store_dir:", "extension_hint: will_extend, domain: CORE, store_dir:")) defer ln.Shutdown() // This shows we have extended this system. @@ -6580,11 +6645,13 @@ func TestJetStreamClusterDomains(t *testing.T) { ln.mu.Unlock() remote.RLock() if remote.RemoteLeafOpts.LocalAccount == "$SYS" { - if len(remote.RemoteLeafOpts.DenyExports) != 3 { - t.Fatalf("Expected to have deny exports, got %+v", remote.RemoteLeafOpts.DenyExports) - } - if len(remote.RemoteLeafOpts.DenyImports) != 3 { - t.Fatalf("Expected to have deny imports, got %+v", remote.RemoteLeafOpts.DenyImports) + for _, s := range denyAllJs { + if r := ln.perms.pub.deny.Match(s); len(r.psubs) != 1 { + t.Fatalf("Expected to have deny permission for %s", s) + } + if r := ln.perms.sub.deny.Match(s); len(r.psubs) != 1 { + t.Fatalf("Expected to have deny permission for %s", s) + } } } remote.RUnlock() @@ -6755,7 +6822,7 @@ func TestJetStreamClusterDomainsWithNoJSHub(t *testing.T) { // Client based API - Connected to the core cluster with no JS but account has JS. s := c.randomServer() // Make sure the JS interest from the LNs has made it to this server. - checkSubInterest(t, s, "NOJS", "$JS.API.>", time.Second) + checkSubInterest(t, s, "NOJS", "$JS.SPOKE.API.>", time.Second) nc, _ := jsClientConnect(t, s, nats.UserInfo("nojs", "p")) defer nc.Close() @@ -6769,7 +6836,7 @@ func TestJetStreamClusterDomainsWithNoJSHub(t *testing.T) { } // Do by hand to make sure we only get one response. - sis := fmt.Sprintf(JSApiStreamCreateT, "TEST") + sis := fmt.Sprintf(strings.ReplaceAll(JSApiStreamCreateT, JSApiPrefix, "$JS.SPOKE.API"), "TEST") rs := nats.NewInbox() sub, _ := nc.SubscribeSync(rs) nc.PublishRequest(sis, rs, req) @@ -6779,7 +6846,8 @@ func TestJetStreamClusterDomainsWithNoJSHub(t *testing.T) { if nr, _, err := sub.Pending(); err != nil || nr != 1 { t.Fatalf("Expected 1 response, got %d and %v", nr, err) } - resp, _ := sub.NextMsg(time.Second) + resp, err := sub.NextMsg(time.Second) + require_NoError(t, err) // This StreamInfo should *not* have a domain set. // Do by hand until this makes it to the Go client. @@ -6881,6 +6949,9 @@ func TestJetStreamClusterDomainsAndSameNameSources(t *testing.T) { spoke2 := c.createLeafNodeWithTemplate("LN-SPOKE-2", tmpl) defer spoke2.Shutdown() + checkLeafNodeConnectedCount(t, spoke1, 2) + checkLeafNodeConnectedCount(t, spoke2, 2) + subjFor := func(s *Server) string { switch s { case spoke1: @@ -6997,8 +7068,9 @@ func TestJetStreamClusterDomainsAndSameNameSources(t *testing.T) { } } -// When a leafnode enables JS on an account that is not enabled on the remote cluster account this should -// still work. Early NGS beta testers etc. +// When a leafnode enables JS on an account that is not enabled on the remote cluster account this should fail +// Accessing a jet stream in a different availability domain requires the client provide a damain name, or +// the server having set up appropriate defaults (default_js_domain. tested in leafnode_test.go) func TestJetStreamClusterSingleLeafNodeEnablingJetStream(t *testing.T) { tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: HUB, store_dir:", 1) c := createJetStreamCluster(t, tmpl, "HUB", _EMPTY_, 3, 11322, true) @@ -7019,10 +7091,9 @@ func TestJetStreamClusterSingleLeafNodeEnablingJetStream(t *testing.T) { s := c.randomServer() nc, js = jsClientConnect(t, s, nats.UserInfo("nojs", "p")) defer nc.Close() - - if _, err := js.AccountInfo(); err != nil { - t.Fatalf("Unexpected error: %v", err) - } + _, err := js.AccountInfo() + // error is context deadline exceeded as the local account has no js and can't reach the remote one + require_True(t, err == context.DeadlineExceeded) } func TestJetStreamClusterLeafNodesWithoutJS(t *testing.T) { @@ -7053,14 +7124,14 @@ func TestJetStreamClusterLeafNodesWithoutJS(t *testing.T) { defer ln.Shutdown() // Check that we can access JS in the $G account on the cluster through the leafnode. - testJS(ln, "HUB", false) + testJS(ln, "HUB", true) ln.Shutdown() // Now create a leafnode cluster with No JS and make sure that works. lnc := c.createLeafNodesNoJS("LN-SYS-C-NOJS", 3) defer lnc.shutdown() - testJS(lnc.randomServer(), "HUB", false) + testJS(lnc.randomServer(), "HUB", true) lnc.shutdown() // Do mixed mode but with a JS config block that specifies domain and just sets it to disabled. @@ -7091,18 +7162,18 @@ func TestJetStreamClusterLeafNodesWithSameDomainNames(t *testing.T) { c.waitOnPeerCount(6) } -// Issue reported with superclusters and leafnodes where first few get next requests for pull susbcribers +// Issue reported with superclusters and leafnodes where first few get next requests for pull subscribers // have the wrong subject. func TestJetStreamClusterSuperClusterGetNextRewrite(t *testing.T) { sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 2, 2) defer sc.shutdown() // Will connect the leafnode to cluster C1. We will then connect the "client" to cluster C2 to cross gateways. - ln := sc.clusterForName("C1").createSingleLeafNodeNoSystemAccountAndEnablesJetStream() + ln := sc.clusterForName("C1").createSingleLeafNodeNoSystemAccountAndEnablesJetStreamWithDomain("C", "nojs") defer ln.Shutdown() c2 := sc.clusterForName("C2") - nc, js := jsClientConnect(t, c2.randomServer(), nats.UserInfo("nojs", "p")) + nc, js := jsClientConnectEx(t, c2.randomServer(), "C", nats.UserInfo("nojs", "p")) defer nc.Close() // Create a stream and add messages. @@ -7193,7 +7264,7 @@ func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) { c := createJetStreamCluster(t, jsClusterAccountsTempl, "HUB", _EMPTY_, 2, 23133, false) defer c.shutdown() - ln := c.createLeafNodesWithStartPort("LN", 2, 22110) + ln := c.createLeafNodesWithStartPortAndDomain("LN", 2, 22110, _EMPTY_) defer ln.shutdown() // Wait on all peers. @@ -7692,6 +7763,8 @@ func TestJetStreamClusterNilMsgWithHeaderThroughSourcedStream(t *testing.T) { spoke := c.createLeafNodeWithTemplate("SPOKE", tmpl) defer spoke.Shutdown() + checkLeafNodeConnectedCount(t, spoke, 2) + // Client for API requests. nc, js := jsClientConnect(t, spoke) defer nc.Close() @@ -9332,6 +9405,8 @@ func TestJetStreamClusterStreamUpdateMissingBeginning(t *testing.T) { // Now shutdown. nsl.Shutdown() + // make sure a leader exists + c.waitOnStreamLeader("$G", "TEST") for i := 0; i < toSend; i++ { if _, err := js.PublishAsync("foo", msg); err != nil { @@ -9855,11 +9930,16 @@ func createJetStreamSuperClusterWithTemplate(t *testing.T, tmpl string, numServe func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *cluster { // Create our leafnode cluster template first. - return sc.randomCluster().createLeafNodes(clusterName, numServers) + return sc.createLeafNodesWithDomain(clusterName, numServers, "") } -func (sc *supercluster) createSingleLeafNode() *Server { - return sc.randomCluster().createLeafNode() +func (sc *supercluster) createLeafNodesWithDomain(clusterName string, numServers int, domain string) *cluster { + // Create our leafnode cluster template first. + return sc.randomCluster().createLeafNodes(clusterName, numServers, domain) +} + +func (sc *supercluster) createSingleLeafNode(extend bool) *Server { + return sc.randomCluster().createLeafNode(extend) } func (sc *supercluster) leader() *Server { @@ -10253,20 +10333,29 @@ var jsLeafFrag = ` } ` -func (c *cluster) createLeafNodes(clusterName string, numServers int) *cluster { - return c.createLeafNodesWithStartPort(clusterName, numServers, 22111) +func (c *cluster) createLeafNodes(clusterName string, numServers int, domain string) *cluster { + return c.createLeafNodesWithStartPortAndDomain(clusterName, numServers, 22111, domain) } func (c *cluster) createLeafNodesNoJS(clusterName string, numServers int) *cluster { return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNodeNoJS, clusterName, numServers, 21333) } -func (c *cluster) createLeafNodesWithStartPort(clusterName string, numServers int, portStart int) *cluster { - return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNode, clusterName, numServers, portStart) +func (c *cluster) createLeafNodesWithStartPortAndDomain(clusterName string, numServers int, portStart int, domain string) *cluster { + if domain == _EMPTY_ { + return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNode, clusterName, numServers, portStart) + } + tmpl := strings.Replace(jsClusterTemplWithLeafNode, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, domain), 1) + return c.createLeafNodesWithTemplateAndStartPort(tmpl, clusterName, numServers, portStart) } -func (c *cluster) createLeafNode() *Server { - return c.createLeafNodeWithTemplate("LNS", jsClusterTemplWithSingleLeafNode) +func (c *cluster) createLeafNode(extend bool) *Server { + if extend { + return c.createLeafNodeWithTemplate("LNS", + strings.ReplaceAll(jsClusterTemplWithSingleLeafNode, "store_dir:", " extension_hint: will_extend, store_dir:")) + } else { + return c.createLeafNodeWithTemplate("LNS", jsClusterTemplWithSingleLeafNode) + } } func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server { @@ -10371,6 +10460,19 @@ func jsClientConnect(t *testing.T, s *Server, opts ...nats.Option) (*nats.Conn, return nc, js } +func jsClientConnectEx(t *testing.T, s *Server, domain string, opts ...nats.Option) (*nats.Conn, nats.JetStreamContext) { + t.Helper() + nc, err := nats.Connect(s.ClientURL(), opts...) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + js, err := nc.JetStream(nats.MaxWait(10*time.Second), nats.Domain(domain)) + if err != nil { + t.Fatalf("Unexpected error getting JetStream context: %v", err) + } + return nc, js +} + func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) { t.Helper() checkFor(t, 10*time.Second, 20*time.Millisecond, func() error { @@ -10426,12 +10528,17 @@ func (c *cluster) waitOnPeerCount(n int) { c.waitOnLeader() leader = c.leader() } - expires := time.Now().Add(10 * time.Second) + expires := time.Now().Add(30 * time.Second) for time.Now().Before(expires) { if peers := leader.JetStreamClusterPeers(); len(peers) == n { return } time.Sleep(100 * time.Millisecond) + leader = c.leader() + for leader == nil { + c.waitOnLeader() + leader = c.leader() + } } c.t.Fatalf("Expected a cluster peer count of %d, got %d", n, len(leader.JetStreamClusterPeers())) } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 8d157caf..fb6a8c5e 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -4202,6 +4202,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { opts.Port = -1 tdir := createDir(t, "jstests-storedir-") opts.JetStream = true + opts.JetStreamDomain = "domain" opts.StoreDir = tdir rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lopts.LeafNode.Host, lopts.LeafNode.Port)) opts.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{rurl}}} @@ -4417,6 +4418,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { } // Now connect through a cluster server and make sure we can get things to work this way as well. + // This client, connecting to a leaf without shared system account and domain needs to provide the domain explicitly. nc2 := clientConnectToServer(t, ls) defer nc2.Close() // Wait a bit for interest to propagate. @@ -4425,7 +4427,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { snapshot = snapshot[:0] req, _ = json.Marshal(sreq) - rmsg, err = nc2.Request(fmt.Sprintf(JSApiStreamSnapshotT, mname), req, time.Second) + rmsg, err = nc2.Request(fmt.Sprintf(strings.ReplaceAll(JSApiStreamSnapshotT, JSApiPrefix, "$JS.domain.API"), mname), req, time.Second) if err != nil { t.Fatalf("Unexpected error on snapshot request: %v", err) } @@ -4450,7 +4452,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { state = mset.state() mset.delete() - rmsg, err = nc2.Request(fmt.Sprintf(JSApiStreamRestoreT, mname), req, time.Second) + rmsg, err = nc2.Request(strings.ReplaceAll(JSApiStreamRestoreT, JSApiPrefix, "$JS.domain.API"), req, time.Second) if err != nil { t.Fatalf("Unexpected error on snapshot request: %v", err) } @@ -4471,7 +4473,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { t.Fatalf("Expected restore subscription to be closed") } - rmsg, err = nc2.Request(fmt.Sprintf(JSApiStreamRestoreT, mname), req, time.Second) + rmsg, err = nc2.Request(strings.ReplaceAll(JSApiStreamRestoreT, JSApiPrefix, "$JS.domain.API"), req, time.Second) if err != nil { t.Fatalf("Unexpected error on snapshot request: %v", err) } @@ -11156,10 +11158,11 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } // Do manually for now. - nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + m, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + require_NoError(t, err) si, err := js.StreamInfo("KV") if err != nil { - t.Fatalf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data)) } if si == nil || si.Config.Name != "KV" { t.Fatalf("StreamInfo is not correct %+v", si) diff --git a/server/leafnode.go b/server/leafnode.go index 9c198cc2..7d0acdea 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -34,6 +34,7 @@ import ( "sync/atomic" "time" + "github.com/nats-io/jwt/v2" "github.com/nats-io/nkeys" "github.com/nats-io/nuid" ) @@ -65,6 +66,8 @@ type leaf struct { remoteCluster string // remoteServer holds onto the remove server's name or ID. remoteServer string + // domain name of remote server + remoteDomain string // Used to suppress sub and unsub interest. Same as routes but our audience // here is tied to this leaf node. This will hold all subscriptions except this // leaf nodes. This represents all the interest we want to send to the other side. @@ -108,133 +111,54 @@ func (c *client) isHubLeafNode() bool { return c.kind == LEAF && !c.leaf.isSpoke } -// Will add in the deny exports and imports for JetStream on solicited connections if we -// are sharing the system account and wanting to extend the JS domain. -// r lock should be held. -func (s *Server) addInJSDeny(r *leafNodeCfg) { - s.addInJSDenyExport(r) - s.addInJSDenyImport(r) -} - -// Will add in the deny export for JetStream on solicited connections if we -// detect we have multiple JetStream domains and we know our local account -// is JetStream enabled. -// r lock should be held. -func (s *Server) addInJSDenyExport(r *leafNodeCfg) { - for _, dsubj := range r.DenyExports { - if dsubj == jsAllAPI { - return - } - } - - s.Noticef("Adding deny export of %q for leafnode configuration on %q that bridges system account", jsAllAPI, r.LocalAccount) - r.DenyExports = append(r.DenyExports, jsAllAPI) - - // We added in some deny clauses here so need to regenerate the permissions etc. - perms := &Permissions{} - perms.Publish = &SubjectPermission{Deny: r.DenyExports} - if len(r.DenyImports) > 0 { - perms.Subscribe = &SubjectPermission{Deny: r.DenyImports} - } - r.perms = perms -} - -// Will add in the deny import for JetStream on solicited connections if we -// detect we have multiple JetStream domains and we know our local account -// is JetStream enabled. -// r lock should be held. -func (s *Server) addInJSDenyImport(r *leafNodeCfg) { - for _, dsubj := range r.DenyImports { - if dsubj == jsAllAPI { - return - } - } - - s.Noticef("Adding deny import of %q for leafnode configuration on %q that bridges system account", jsAllAPI, r.LocalAccount) - r.DenyImports = append(r.DenyImports, jsAllAPI) - - // We added in some deny clauses here so need to regenerate the permissions etc. - perms := &Permissions{} - perms.Subscribe = &SubjectPermission{Deny: r.DenyImports} - if len(r.DenyExports) > 0 { - perms.Publish = &SubjectPermission{Deny: r.DenyExports} - } - r.perms = perms -} - -// Used for $SYS accounts when sharing but using separate JS domains. -// r lock should be held. -func (s *Server) addInJSDenyAll(r *leafNodeCfg) { - denyAll := []string{jscAllSubj, raftAllSubj, jsAllAPI} - - s.Noticef("Sharing system account but utilizing separate JetStream Domains") - s.Noticef("Adding deny of %+v for leafnode configuration that bridges system account", denyAll) - - hasDeny := func(deny string, l []string) bool { - for _, le := range l { - if le == deny { - return true - } - } - return false - } - - var exportAdded, importAdded bool - for _, deny := range denyAll { - if !hasDeny(deny, r.DenyExports) { - r.DenyExports = append(r.DenyExports, deny) - exportAdded = true - } - if !hasDeny(deny, r.DenyImports) { - r.DenyImports = append(r.DenyImports, deny) - importAdded = true - } - } - if !exportAdded && !importAdded { - return - } - - perms := &Permissions{} - if exportAdded { - perms.Publish = &SubjectPermission{Deny: r.DenyExports} - } - if importAdded { - perms.Subscribe = &SubjectPermission{Deny: r.DenyImports} - } - r.perms = perms -} - -// Determine if we are sharing our local system account with the remote. -func (s *Server) hasSystemRemoteLeaf() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.hasSystemRemoteLeafLocked() != nil -} - -func (s *Server) hasSystemRemoteLeafLocked() *leafNodeCfg { - if s.sys == nil { - return nil - } - - sacc := s.sys.account.Name - for _, r := range s.leafRemoteCfgs { - r.RLock() - lacc := r.LocalAccount - r.RUnlock() - if lacc == sacc { - return r - } - } - return nil -} - // This will spin up go routines to solicit the remote leaf node connections. func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) { - for _, r := range remotes { + sysAccName := _EMPTY_ + sAcc := s.SystemAccount() + if sAcc != nil { + sysAccName = sAcc.Name + } + addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg { s.mu.Lock() remote := newLeafNodeCfg(r) + creds := remote.Credentials + accName := remote.LocalAccount s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote) + // Print notice if + if isSysAccRemote { + if len(remote.DenyExports) > 0 { + s.Noticef("Remote for System Account uses restricted export permissions") + } + if len(remote.DenyImports) > 0 { + s.Noticef("Remote for System Account uses restricted import permissions") + } + } s.mu.Unlock() + if creds != _EMPTY_ { + contents, err := ioutil.ReadFile(creds) + defer wipeSlice(contents) + if err != nil { + s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err) + } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 { + s.Errorf("LeafNode Remote Credentials file %q malformed", creds) + } else if _, err := nkeys.FromSeed(items[1][1]); err != nil { + s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds) + } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil { + s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds) + } else if isSysAccRemote { + if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil { + s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds) + } + } else { + if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil { + s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds) + } + } + } + return remote + } + for _, r := range remotes { + remote := addRemote(r, r.LocalAccount == sysAccName) s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) }) } } @@ -684,12 +608,14 @@ func (s *Server) startLeafNodeAcceptLoop() { // RegEx to match a creds file with user JWT and Seed. var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-]{3,}[^\n]*[-]{3,}\n))`) +// clusterName is provided as argument to avoid lock ordering issues with the locked client c // Lock should be held entering here. func (c *client) sendLeafConnect(clusterName string, tlsRequired, headers bool) error { // We support basic user/pass and operator based user JWT with signatures. cinfo := leafConnectInfo{ TLS: tlsRequired, ID: c.srv.info.ID, + Domain: c.srv.info.Domain, Name: c.srv.info.Name, Hub: c.leaf.remote.Hub, Cluster: clusterName, @@ -869,7 +795,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf c.mu.Lock() c.initClient() - c.Noticef("Leafnode connection created%s", remoteSuffix) + c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name) if remote != nil { solicited = true @@ -990,8 +916,6 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf func (c *client) processLeafnodeInfo(info *Info) { s := c.srv - opts := s.getOpts() - hasSysShared, sysAcc := s.hasSystemRemoteLeaf(), s.SystemAccount() c.mu.Lock() if c.leaf == nil || c.isClosed() { @@ -1048,56 +972,10 @@ func (c *client) processLeafnodeInfo(info *Info) { } else { c.leaf.remoteServer = info.Name } - - // Check for JetStream semantics to deny the JetStream API as needed. - // This is so that if JetStream is enabled on both sides we can separately address both. - hasJSDomain := opts.JetStreamDomain != _EMPTY_ - inJSEnabledDomain := s.JetStreamEnabledForDomain() - - // Check for mixed mode scenarios to resolve presence of domain names. - if !s.JetStreamEnabled() && inJSEnabledDomain && !hasJSDomain && s.jetStreamHasDomainConfigured() { - hasJSDomain = true - } - - if remote, acc := c.leaf.remote, c.acc; remote != nil { - accHasJS := acc.jetStreamConfigured() - remote.Lock() - // JetStream checks for mappings and permissions updates. - if acc != sysAcc { - // Check if JetStream is enabled for this domain. If it's not, and the account - // does not have JS, we can act as pass through, so do not deny. - if hasSysShared && (inJSEnabledDomain || accHasJS) { - s.addInJSDeny(remote) - } else { - // Here we want to suppress if this local account has JS enabled. - // This is regardless of whether or not this server is actually running JS. - // We only suppress export. But we do send an indication about our JetStream - // status in the connect and the hub side will suppress as well if the remote - // account also has JetStream enabled. - if accHasJS { - s.addInJSDenyExport(remote) - // If we specified a domain do not import by default. - if hasJSDomain { - s.addInJSDenyImport(remote) - } - } - } - // If we have a specified JetStream domain we will want to add a mapping to - // allow access cross domain for each non-system account. - if hasJSDomain && accHasJS { - src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain) - if err := acc.AddMapping(src, jsAllAPI); err != nil { - c.Debugf("Error adding JetStream domain mapping: %v", err) - } - } - } else if hasJSDomain && opts.JetStreamDomain != info.Domain { - s.addInJSDenyAll(remote) - } - - c.setPermissions(remote.perms) - remote.Unlock() - } + c.leaf.remoteDomain = info.Domain + c.leaf.remoteCluster = info.Cluster } + // For both initial INFO and async INFO protocols, Possibly // update our list of remote leafnode URLs we can connect to. if c.leaf.remote != nil && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) { @@ -1264,9 +1142,14 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c var accName string c.mu.Lock() cid := c.cid - if c.acc != nil { - accName = c.acc.Name + acc := c.acc + if acc != nil { + accName = acc.Name } + myRemoteDomain := c.leaf.remoteDomain + mySrvName := c.leaf.remoteServer + myClustName := c.leaf.remoteCluster + solicited := c.leaf.remote != nil c.mu.Unlock() var old *client @@ -1300,6 +1183,119 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c old.closeConnection(DuplicateRemoteLeafnodeConnection) c.Warnf("Replacing connection from same server") } + + srvDecorated := func() string { + if myClustName == _EMPTY_ { + return mySrvName + } + return fmt.Sprintf("%s/%s", mySrvName, myClustName) + } + + opts := s.getOpts() + sysAcc := s.SystemAccount() + js := s.getJetStream() + var meta *raft + if js != nil { + if mg := js.getMetaGroup(); mg != nil { + meta = mg.(*raft) + } + } + blockMappingOutgoing := false + // Deny (non domain) JetStream API traffic unless system account is shared + // and domain names are identical and extending is not disabled + + // Check if backwards compatibility has been enabled and needs to be acted on + forceSysAccDeny := false + if len(opts.JsAccDefaultDomain) > 0 { + if acc == sysAcc { + for _, d := range opts.JsAccDefaultDomain { + if d == _EMPTY_ { + // Extending Js via leaf node is mutually exclusive with a domain mapping to the empty/default domain. + // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node. + c.Noticef("Forcing System Account into non extend mode due to presence of empty default domain") + forceSysAccDeny = true + break + } + } + } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ { + // for backwards compatibility with old setups that do not have a domain name set + c.Noticef("Skipping deny %q for account %q due to default domain", jsAllAPI, accName) + return + } + } + + // If the server has JS disabled, it may still be part of a JetStream that could be extended. + // This is either signaled by js being disabled and a domain set, + // or in cases where no domain name exists, an extension hint is set. + // However, this is only relevant in mixed setups. + // + // If the system account connects but default domains are present, JetStream can't be extended. + if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) || + sysAcc == nil || acc == nil || forceSysAccDeny { + // If domain names mismatch always deny. This applies to system accounts as well as non system accounts. + // Not having a system account, account or JetStream disabled is considered a mismatch as well. + if acc != nil && acc == sysAcc { + c.Noticef("System Account Connected from %s", srvDecorated()) + c.Noticef("JetStream Not Extended, adding denies %+v", denyAllJs) + c.mergeDenyPermissionsLocked(both, denyAllJs) + // When a remote with a system account is present in a server, unless otherwise disabled, the server will be + // started in observer mode. Now that it is clear that this not used, turn the observer mode off. + if solicited && meta != nil && meta.isObserver() { + meta.setObserver(false, extNotExtended) + c.Noticef("Turning JetStream metadata controller Observer Mode off") + // Take note that the domain was not extended to avoid this state from startup. + writePeerState(js.config.StoreDir, meta.currentPeerState()) + // Meta controller can't be leader yet. + // Yet it is possible that due to observer mode every server already stopped campaigning. + // Therefore this server needs to be kicked into campaigning gear explicitly. + meta.Campaign() + } + } else { + c.Noticef("JetStream Not Extended, adding deny %q for account %q", jsAllAPI, accName) + c.mergeDenyPermissionsLocked(both, []string{jsAllAPI}) + } + blockMappingOutgoing = true + } else if acc == sysAcc { + // system account and same domain + s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s", + myRemoteDomain, srvDecorated()) + // In an extension use case, pin leadership to server remotes connect to. + // Therefore, server with a remote that are not already in observer mode, need to be put into it. + if solicited && meta != nil && !meta.isObserver() { + meta.setObserver(true, extExtended) + c.Noticef("Turning JetStream metadata controller Observer Mode on - System Account Connected") + // Take note that the domain was not extended to avoid this state next startup. + writePeerState(js.config.StoreDir, meta.currentPeerState()) + // If this server is the leader already, step down so a new leader can be elected (that is not an observer) + meta.StepDown() + } + } else { + // This deny is needed in all cases (system account shared or not) + // If the system account is shared, jsAllAPI traffic will go through the system account. + // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account. + // If the system account is NOT shared, jsAllAPI traffic has no business + c.Noticef("Adding deny %q for account %q", jsAllAPI, accName) + c.mergeDenyPermissionsLocked(both, []string{jsAllAPI}) + } + // If we have a specified JetStream domain we will want to add a mapping to + // allow access cross domain for each non-system account. + if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && opts.JetStream { + src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain) + if err := acc.AddMapping(src, jsAllAPI); err != nil { + c.Debugf("Error adding JetStream domain mapping: %s", err.Error()) + } else { + c.Noticef("Adding JetStream Domain Mapping %q to account %q", src, accName) + } + if blockMappingOutgoing { + // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection + // This is a guard against a miss-config with two identical domain names and will only cover some forms + // of this issue, not all of them. + // This guards against a hub and a spoke having the same domain name. + // But not two spokes having the same one and the request coming from the hub. + c.mergeDenyPermissionsLocked(pub, []string{src}) + c.Noticef("Adding deny %q for outgoing messages to account %q", src, accName) + } + } } func (s *Server) removeLeafNodeConnection(c *client) { @@ -1325,6 +1321,7 @@ type leafConnectInfo struct { TLS bool `json:"tls_required"` Comp bool `json:"compression,omitempty"` ID string `json:"server_id,omitempty"` + Domain string `json:"domain,omitempty"` Name string `json:"name,omitempty"` Hub bool `json:"is_hub,omitempty"` Cluster string `json:"cluster,omitempty"` @@ -1367,9 +1364,6 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro // Check if this server supports headers. supportHeaders := c.srv.supportsHeaders() - // Grab system account and server options. - sysAcc, opts := s.SystemAccount(), s.getOpts() - c.mu.Lock() // Leaf Nodes do not do echo or verbose or pedantic. c.opts.Verbose = false @@ -1393,6 +1387,8 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro c.leaf.remoteCluster = proto.Cluster } + c.leaf.remoteDomain = proto.Domain + // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction. if !c.isSolicitedLeafNode() && c.perms != nil { @@ -1405,29 +1401,12 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro } } - // Check for JetStream domain - jsConfigured := c.acc.jetStreamConfigured() - doDomainMappings := opts.JetStreamDomain != _EMPTY_ && c.acc != sysAcc && jsConfigured - - // If we have JS enabled and the other side does as well we need to add in an import deny clause. - if jsConfigured && proto.JetStream { - c.mergePubDenyPermissions([]string{jsAllAPI}) - // We need to send this back to the other side. - if c.isHubLeafNode() { - if c.opts.Import == nil { - c.opts.Import = &SubjectPermission{} - } - c.opts.Import.Deny = append(c.opts.Import.Deny, jsAllAPI) - } - } - // Set the Ping timer s.setFirstPingTimer(c) // If we received pub deny permissions from the other end, merge with existing ones. - c.mergePubDenyPermissions(proto.DenyPub) + c.mergeDenyPermissions(pub, proto.DenyPub) - acc := c.acc c.mu.Unlock() // Add in the leafnode here since we passed through auth at this point. @@ -1445,15 +1424,6 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro // This will no-op as needed. s.sendLeafNodeConnect(c.acc) - // If we have a specified JetStream domain we will want to add a mapping to - // allow access cross domain for each non-system account. - if doDomainMappings { - src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain) - if err := acc.AddMapping(src, jsAllAPI); err != nil { - c.Debugf("Error adding JetStream domain mapping: %v", err) - } - } - return nil } @@ -1513,7 +1483,7 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { } // Check if we have an existing service import reply. - siReply := acc.siReply + siReply := copyBytes(acc.siReply) // Since leaf nodes only send on interest, if the bound // account has import services we need to send those over. diff --git a/server/leafnode_test.go b/server/leafnode_test.go index e7327699..4beef433 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -19,6 +19,7 @@ import ( "context" "crypto/tls" "fmt" + "io/ioutil" "math/rand" "net" "net/url" @@ -2017,14 +2018,24 @@ type proxyAcceptDetectFailureLate struct { l net.Listener srvs []net.Conn leaf net.Conn + startChan chan struct{} } func (p *proxyAcceptDetectFailureLate) run(t *testing.T) int { + return p.runEx(t, false) +} + +func (p *proxyAcceptDetectFailureLate) runEx(t *testing.T, needStart bool) int { l, err := natsListen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("Error on listen: %v", err) } p.Lock() + var startChan chan struct{} + if needStart { + startChan = make(chan struct{}) + p.startChan = startChan + } p.l = l p.Unlock() port := l.Addr().(*net.TCPAddr).Port @@ -2039,6 +2050,9 @@ func (p *proxyAcceptDetectFailureLate) run(t *testing.T) int { } p.Unlock() }() + if startChan != nil { + <-startChan + } for { c, err := l.Accept() if err != nil { @@ -2073,8 +2087,21 @@ func (p *proxyAcceptDetectFailureLate) run(t *testing.T) int { return port } +func (p *proxyAcceptDetectFailureLate) start() { + p.Lock() + if p.startChan != nil { + close(p.startChan) + p.startChan = nil + } + p.Unlock() +} + func (p *proxyAcceptDetectFailureLate) close() { p.Lock() + if p.startChan != nil { + close(p.startChan) + p.startChan = nil + } p.l.Close() p.Unlock() @@ -4072,3 +4099,949 @@ func TestLeafNodeInterestPropagationDaisychain(t *testing.T) { checkSubInterest(t, sB, "$G", "foo", time.Second) checkSubInterest(t, sAA, "$G", "foo", time.Second) // failure issue 2448 } + +func TestLeafNodeJetStreamClusterExtensionWithSystemAccount(t *testing.T) { + /* + Topologies tested here + same == true + A <-> B + ^ |\ + | \ + | proxy + | \ + LA <-> LB + + same == false + A <-> B + ^ ^ + | | + | proxy + | | + LA <-> LB + + The proxy is turned on later, such that the system account connection can be started later, in a controlled way + This explicitly tests the system state before and after this happens. + */ + + tmplA := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account: SYS +leafnodes: { + listen: 127.0.0.1:-1 + no_advertise: true + authorization: { + timeout: 0.5 + } +} +jetstream :{ + domain: "cluster" + store_dir: "%s" + max_mem: 100Mb + max_file: 100Mb +} +server_name: A +cluster: { + name: clust1 + listen: 127.0.0.1:50554 + routes=[nats-route://127.0.0.1:50555] + no_advertise: true +} +` + + tmplB := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account: SYS +leafnodes: { + listen: 127.0.0.1:-1 + no_advertise: true + authorization: { + timeout: 0.5 + } +} +jetstream: { + domain: "cluster" + store_dir: "%s" + max_mem: 100Mb + max_file: 100Mb +} +server_name: B +cluster: { + name: clust1 + listen: 127.0.0.1:50555 + routes=[nats-route://127.0.0.1:50554] + no_advertise: true +} +` + + tmplLA := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account = SYS +jetstream: { + domain: "cluster" + store_dir: %s + max_mem: 50Mb + max_file: 50Mb + %s +} +server_name: LA +cluster: { + name: clustL + listen: 127.0.0.1:50556 + routes=[nats-route://127.0.0.1:50557] + no_advertise: true +} +leafnodes:{ + no_advertise: true + remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A}, + {url:nats://s1:s1@127.0.0.1:%d, account: SYS}] +} +` + + tmplLB := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account = SYS +jetstream: { + domain: "cluster" + store_dir: %s + max_mem: 50Mb + max_file: 50Mb + %s +} +server_name: LB +cluster: { + name: clustL + listen: 127.0.0.1:50557 + routes=[nats-route://127.0.0.1:50556] + no_advertise: true +} +leafnodes:{ + no_advertise: true + remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A}, + {url:nats://s1:s1@127.0.0.1:%d, account: SYS}] +} +` + + for _, testCase := range []struct { + // which topology to pick + same bool + // If leaf server should be operational and form a Js cluster prior to joining. + // In this setup this would be an error as you give the wrong hint. + // But this should work itself out regardless + leafFunctionPreJoin bool + }{ + {true, true}, + {true, false}, + {false, true}, + {false, false}} { + t.Run(fmt.Sprintf("%t-%t", testCase.same, testCase.leafFunctionPreJoin), func(t *testing.T) { + sd1 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd1) + confA := createConfFile(t, []byte(fmt.Sprintf(tmplA, sd1))) + defer removeFile(t, confA) + sA, _ := RunServerWithConfig(confA) + defer sA.Shutdown() + + sd2 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd2) + confB := createConfFile(t, []byte(fmt.Sprintf(tmplB, sd2))) + defer removeFile(t, confB) + sB, _ := RunServerWithConfig(confB) + defer sB.Shutdown() + + checkClusterFormed(t, sA, sB) + + c := cluster{t: t, servers: []*Server{sA, sB}} + c.waitOnLeader() + + // starting this will allow the second remote in tmplL to successfully connect. + port := sB.opts.LeafNode.Port + if testCase.same { + port = sA.opts.LeafNode.Port + } + p := &proxyAcceptDetectFailureLate{acceptPort: port} + defer p.close() + lPort := p.runEx(t, true) + + hint := "" + if testCase.leafFunctionPreJoin { + hint = fmt.Sprintf("extension_hint: %s", strings.ToUpper(jsNoExtend)) + } + + sd3 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd3) + // deliberately pick server sA and proxy + confLA := createConfFile(t, []byte(fmt.Sprintf(tmplLA, sd3, hint, sA.opts.LeafNode.Port, lPort))) + defer removeFile(t, confLA) + sLA, _ := RunServerWithConfig(confLA) + defer sLA.Shutdown() + + sd4 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd4) + // deliberately pick server sA and proxy + confLB := createConfFile(t, []byte(fmt.Sprintf(tmplLB, sd4, hint, sA.opts.LeafNode.Port, lPort))) + defer removeFile(t, confLB) + sLB, _ := RunServerWithConfig(confLB) + defer sLB.Shutdown() + + checkClusterFormed(t, sLA, sLB) + + strmCfg := func(name, placementCluster string) *nats.StreamConfig { + if placementCluster == "" { + return &nats.StreamConfig{Name: name, Replicas: 1, Subjects: []string{name}} + } + return &nats.StreamConfig{Name: name, Replicas: 1, Subjects: []string{name}, + Placement: &nats.Placement{Cluster: placementCluster}} + } + // Only after the system account is fully connected can streams be placed anywhere. + testJSFunctions := func(pass bool) { + ncA := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sA.opts.Port)) + defer ncA.Close() + jsA, err := ncA.JetStream() + require_NoError(t, err) + _, err = jsA.AddStream(strmCfg(fmt.Sprintf("fooA1-%t", pass), "")) + require_NoError(t, err) + _, err = jsA.AddStream(strmCfg(fmt.Sprintf("fooA2-%t", pass), "clust1")) + require_NoError(t, err) + _, err = jsA.AddStream(strmCfg(fmt.Sprintf("fooA3-%t", pass), "clustL")) + if pass { + require_NoError(t, err) + } else { + require_Error(t, err) + require_Contains(t, err.Error(), "insufficient resources") + } + ncL := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sLA.opts.Port)) + defer ncL.Close() + jsL, err := ncL.JetStream() + require_NoError(t, err) + _, err = jsL.AddStream(strmCfg(fmt.Sprintf("fooL1-%t", pass), "")) + require_NoError(t, err) + _, err = jsL.AddStream(strmCfg(fmt.Sprintf("fooL2-%t", pass), "clustL")) + require_NoError(t, err) + _, err = jsL.AddStream(strmCfg(fmt.Sprintf("fooL3-%t", pass), "clust1")) + if pass { + require_NoError(t, err) + } else { + require_Error(t, err) + require_Contains(t, err.Error(), "insufficient resources") + } + } + clusterLnCnt := func(expected int) error { + cnt := 0 + for _, s := range c.servers { + cnt += s.NumLeafNodes() + } + if cnt == expected { + return nil + } + return fmt.Errorf("not enought leaf node connections, got %d needed %d", cnt, expected) + } + + // Even though there are two remotes defined in tmplL, only one will be able to connect. + checkFor(t, 10*time.Second, time.Second/4, func() error { return clusterLnCnt(2) }) + checkLeafNodeConnectedCount(t, sLA, 1) + checkLeafNodeConnectedCount(t, sLB, 1) + c.waitOnPeerCount(2) + + if testCase.leafFunctionPreJoin { + cl := cluster{t: t, servers: []*Server{sLA, sLB}} + cl.waitOnLeader() + cl.waitOnPeerCount(2) + testJSFunctions(false) + } else { + // In cases where the leaf nodes have to wait for the system account to connect, + // JetStream should not be operational during that time + ncA := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sLA.opts.Port)) + defer ncA.Close() + jsA, err := ncA.JetStream() + require_NoError(t, err) + _, err = jsA.AddStream(strmCfg("fail-false", "")) + require_Error(t, err) + } + // Starting the proxy will connect the system accounts. + // After they are connected the clusters are merged. + // Once this happened, all streams in test can be placed anywhere in the cluster. + // Before that only the cluster the client is connected to can be used for placement + p.start() + + // Even though there are two remotes defined in tmplL, only one will be able to connect. + checkFor(t, 10*time.Second, time.Second/4, func() error { return clusterLnCnt(4) }) + checkLeafNodeConnectedCount(t, sLA, 2) + checkLeafNodeConnectedCount(t, sLB, 2) + + // The leader will reside in the main cluster only + c.waitOnPeerCount(4) + testJSFunctions(true) + }) + } +} + +func TestLeafNodeJetStreamClusterMixedModeExtensionWithSystemAccount(t *testing.T) { + /* Topology used in this test: + CLUSTER(A <-> B <-> C (NO JS)) + ^ + | + LA + */ + + // once every server is up, we expect these peers to be part of the JetStream meta cluster + expectedJetStreamPeers := map[string]struct{}{ + "A": {}, + "B": {}, + "LA": {}, + } + + tmplA := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account: SYS +leafnodes: { + listen: 127.0.0.1:-1 + no_advertise: true + authorization: { + timeout: 0.5 + } +} +jetstream: { %s store_dir: %s; max_mem: 50Mb, max_file: 50Mb } +server_name: A +cluster: { + name: clust1 + listen: 127.0.0.1:50554 + routes=[nats-route://127.0.0.1:50555,nats-route://127.0.0.1:50556] + no_advertise: true +} +` + + tmplB := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account: SYS +leafnodes: { + listen: 127.0.0.1:-1 + no_advertise: true + authorization: { + timeout: 0.5 + } +} +jetstream: { %s store_dir: %s; max_mem: 50Mb, max_file: 50Mb } +server_name: B +cluster: { + name: clust1 + listen: 127.0.0.1:50555 + routes=[nats-route://127.0.0.1:50554,nats-route://127.0.0.1:50556] + no_advertise: true +} +` + + tmplC := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account: SYS +leafnodes: { + listen: 127.0.0.1:-1 + no_advertise: true + authorization: { + timeout: 0.5 + } +} +jetstream: { + enabled: false + %s +} +server_name: C +cluster: { + name: clust1 + listen: 127.0.0.1:50556 + routes=[nats-route://127.0.0.1:50554,nats-route://127.0.0.1:50555] + no_advertise: true +} +` + + tmplLA := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account = SYS +# the extension hint is to simplify this test. without it present we would need a cluster of size 2 +jetstream: { %s store_dir: %s; max_mem: 50Mb, max_file: 50Mb, extension_hint: will_extend } +server_name: LA +leafnodes:{ + no_advertise: true + remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A}, + {url:nats://s1:s1@127.0.0.1:%d, account: SYS}] +} +# add the cluster here so we can test placement +cluster: { name: clustL } +` + for _, withDomain := range []bool{true, false} { + t.Run(fmt.Sprintf("with-domain:%t", withDomain), func(t *testing.T) { + jsDisabledDomainString := _EMPTY_ + jsEnabledDomainString := _EMPTY_ + if withDomain { + jsEnabledDomainString = `domain: "domain", ` + jsDisabledDomainString = `domain: "domain"` + } else { + // in case no domain name is set, fall back to the extension hint. + // since JS is disabled, the value of this does not clash with other uses. + jsDisabledDomainString = "extension_hint: will_extend" + } + + sd1 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd1) + confA := createConfFile(t, []byte(fmt.Sprintf(tmplA, jsEnabledDomainString, sd1))) + defer removeFile(t, confA) + sA, _ := RunServerWithConfig(confA) + defer sA.Shutdown() + + sd2 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd2) + confB := createConfFile(t, []byte(fmt.Sprintf(tmplB, jsEnabledDomainString, sd2))) + defer removeFile(t, confB) + sB, _ := RunServerWithConfig(confB) + defer sB.Shutdown() + + confC := createConfFile(t, []byte(fmt.Sprintf(tmplC, jsDisabledDomainString))) + defer removeFile(t, confC) + sC, _ := RunServerWithConfig(confC) + defer sC.Shutdown() + + checkClusterFormed(t, sA, sB, sC) + c := cluster{t: t, servers: []*Server{sA, sB, sC}} + c.waitOnPeerCount(2) + + sd3 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd3) + // deliberately pick server sC (no JS) to connect to + confLA := createConfFile(t, []byte(fmt.Sprintf(tmplLA, jsEnabledDomainString, sd3, sC.opts.LeafNode.Port, sC.opts.LeafNode.Port))) + defer removeFile(t, confLA) + sLA, _ := RunServerWithConfig(confLA) + defer sLA.Shutdown() + + checkLeafNodeConnectedCount(t, sC, 2) + checkLeafNodeConnectedCount(t, sLA, 2) + c.waitOnPeerCount(3) + peers := c.leader().JetStreamClusterPeers() + for _, peer := range peers { + if _, ok := expectedJetStreamPeers[peer]; !ok { + t.Fatalf("Found unexpected peer %q", peer) + } + } + + // helper to create stream config with uniqe name and subject + cnt := 0 + strmCfg := func(placementCluster string) *nats.StreamConfig { + name := fmt.Sprintf("s-%d", cnt) + cnt++ + if placementCluster == "" { + return &nats.StreamConfig{Name: name, Replicas: 1, Subjects: []string{name}} + } + return &nats.StreamConfig{Name: name, Replicas: 1, Subjects: []string{name}, + Placement: &nats.Placement{Cluster: placementCluster}} + } + + test := func(port int, expectedDefPlacement string) { + ncA := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", port)) + defer ncA.Close() + jsA, err := ncA.JetStream() + require_NoError(t, err) + si, err := jsA.AddStream(strmCfg("")) + require_NoError(t, err) + require_Contains(t, si.Cluster.Name, expectedDefPlacement) + si, err = jsA.AddStream(strmCfg("clust1")) + require_NoError(t, err) + require_Contains(t, si.Cluster.Name, "clust1") + si, err = jsA.AddStream(strmCfg("clustL")) + require_NoError(t, err) + require_Contains(t, si.Cluster.Name, "clustL") + } + + test(sA.opts.Port, "clust1") + test(sB.opts.Port, "clust1") + test(sC.opts.Port, "clust1") + test(sLA.opts.Port, "clustL") + }) + } +} + +func TestLeafNodeJetStreamCredsDenies(t *testing.T) { + tmplL := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account = SYS +jetstream: { + domain: "cluster" + store_dir: %s + max_mem: 50Mb + max_file: 50Mb +} +leafnodes:{ + remotes:[{url:nats://a1:a1@127.0.0.1:50555, account: A, credentials: %s }, + {url:nats://s1:s1@127.0.0.1:50555, account: SYS, credentials: %s, deny_imports: foo, deny_exports: bar}] +} +` + akp, err := nkeys.CreateAccount() + require_NoError(t, err) + creds := createUserWithLimit(t, akp, time.Time{}, func(pl *jwt.UserPermissionLimits) { + pl.Pub.Deny.Add(jsAllAPI) + pl.Sub.Deny.Add(jsAllAPI) + }) + + sd := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd) + + confL := createConfFile(t, []byte(fmt.Sprintf(tmplL, sd, creds, creds))) + defer removeFile(t, confL) + opts := LoadConfig(confL) + sL, err := NewServer(opts) + require_NoError(t, err) + + l := captureNoticeLogger{} + sL.SetLogger(&l, false, false) + + go sL.Start() + defer sL.Shutdown() + + // wait till the notices got printed +UNTIL_READY: + for { + <-time.After(50 * time.Millisecond) + l.Lock() + for _, n := range l.notices { + if strings.Contains(n, "Server is ready") { + l.Unlock() + break UNTIL_READY + } + } + l.Unlock() + } + + l.Lock() + cnt := 0 + for _, n := range l.notices { + if strings.Contains(n, "LeafNode Remote for Account A uses credentials file") || + strings.Contains(n, "LeafNode Remote for System Account uses") || + strings.Contains(n, "Remote for System Account uses restricted export permissions") || + strings.Contains(n, "Remote for System Account uses restricted import permissions") { + cnt++ + } + } + l.Unlock() + require_True(t, cnt == 4) +} + +func TestLeafNodeJetStreamDefaultDomainCfg(t *testing.T) { + tmplHub := ` +listen: 127.0.0.1:%d +accounts :{ + A:{ jetstream: %s, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account: SYS +jetstream : %s +server_name: HUB +leafnodes: { + listen: 127.0.0.1:%d +} +%s +` + + tmplL := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account: SYS +jetstream: { domain: "%s", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +server_name: LEAF +leafnodes: { + remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},%s] +} +%s +` + + test := func(domain string, sysShared bool) { + confHub := createConfFile(t, []byte(fmt.Sprintf(tmplHub, -1, "disabled", "disabled", -1, ""))) + defer removeFile(t, confHub) + sHub, _ := RunServerWithConfig(confHub) + defer sHub.Shutdown() + + noDomainFix := "" + if domain == _EMPTY_ { + noDomainFix = `default_js_domain:{A:""}` + } + + sys := "" + if sysShared { + sys = fmt.Sprintf(`{url:nats://s1:s1@127.0.0.1:%d, account: SYS}`, sHub.opts.LeafNode.Port) + } + + sdLeaf := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sdLeaf) + confL := createConfFile(t, []byte(fmt.Sprintf(tmplL, domain, sdLeaf, sHub.opts.LeafNode.Port, sys, noDomainFix))) + defer removeFile(t, confL) + sLeaf, _ := RunServerWithConfig(confL) + defer sLeaf.Shutdown() + + lnCnt := 1 + if sysShared { + lnCnt++ + } + + checkLeafNodeConnectedCount(t, sHub, lnCnt) + checkLeafNodeConnectedCount(t, sLeaf, lnCnt) + + ncA := natsConnect(t, fmt.Sprintf("nats://a1:a1@127.0.0.1:%d", sHub.opts.Port)) + defer ncA.Close() + jsA, err := ncA.JetStream() + require_NoError(t, err) + + _, err = jsA.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1, Subjects: []string{"foo"}}) + require_True(t, err == nats.ErrNoResponders) + + // Add in default domain and restart server + require_NoError(t, ioutil.WriteFile(confHub, []byte(fmt.Sprintf(tmplHub, + sHub.opts.Port, + "disabled", + "disabled", + sHub.opts.LeafNode.Port, + fmt.Sprintf(`default_js_domain: {A:"%s"}`, domain))), 0664)) + + sHub.Shutdown() + sHub.WaitForShutdown() + checkLeafNodeConnectedCount(t, sLeaf, 0) + sHubUpd1, _ := RunServerWithConfig(confHub) + defer sHubUpd1.Shutdown() + + checkLeafNodeConnectedCount(t, sHubUpd1, lnCnt) + checkLeafNodeConnectedCount(t, sLeaf, lnCnt) + + _, err = jsA.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1, Subjects: []string{"foo"}}) + require_NoError(t, err) + + // Enable jetstream in hub. + sdHub := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sdHub) + jsEnabled := fmt.Sprintf(`{ domain: "%s", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb }`, domain, sdHub) + require_NoError(t, ioutil.WriteFile(confHub, []byte(fmt.Sprintf(tmplHub, + sHubUpd1.opts.Port, + "disabled", + jsEnabled, + sHubUpd1.opts.LeafNode.Port, + fmt.Sprintf(`default_js_domain: {A:"%s"}`, domain))), 0664)) + + sHubUpd1.Shutdown() + sHubUpd1.WaitForShutdown() + checkLeafNodeConnectedCount(t, sLeaf, 0) + sHubUpd2, _ := RunServerWithConfig(confHub) + defer sHubUpd2.Shutdown() + + checkLeafNodeConnectedCount(t, sHubUpd2, lnCnt) + checkLeafNodeConnectedCount(t, sLeaf, lnCnt) + + _, err = jsA.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 1, Subjects: []string{"bar"}}) + require_NoError(t, err) + + // Enable jetstream in account A of hub + // This is a mis config, as you can't have it both ways, local jetstream but default to another one + require_NoError(t, ioutil.WriteFile(confHub, []byte(fmt.Sprintf(tmplHub, + sHubUpd2.opts.Port, + "enabled", + jsEnabled, + sHubUpd2.opts.LeafNode.Port, + fmt.Sprintf(`default_js_domain: {A:"%s"}`, domain))), 0664)) + + if domain != _EMPTY_ { + // in case no domain name exists there are no additional guard rails, hence no error + // It is the users responsibility to get this edge case right + sHubUpd2.Shutdown() + sHubUpd2.WaitForShutdown() + checkLeafNodeConnectedCount(t, sLeaf, 0) + sHubUpd3, err := NewServer(LoadConfig(confHub)) + sHubUpd3.Shutdown() + + require_Error(t, err) + require_Contains(t, err.Error(), `default_js_domain contains account name "A" with enabled JetStream`) + } + } + + t.Run("with-domain-sys", func(t *testing.T) { + test("domain", true) + }) + t.Run("with-domain-nosys", func(t *testing.T) { + test("domain", false) + }) + t.Run("no-domain", func(t *testing.T) { + test("", true) + }) + t.Run("no-domain", func(t *testing.T) { + test("", false) + }) +} + +func TestLeafNodeJetStreamDefaultDomainJwtExplicit(t *testing.T) { + tmplHub := ` +listen: 127.0.0.1:%d +operator: %s +system_account: %s +resolver: MEM +resolver_preload: { + %s:%s + %s:%s +} +jetstream : disabled +server_name: HUB +leafnodes: { + listen: 127.0.0.1:%d +} +%s +` + + tmplL := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enable, users:[ {user:a1,password:a1}]}, + SYS:{ users:[ {user:s1,password:s1}]}, +} +system_account: SYS +jetstream: { domain: "%s", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +server_name: LEAF +leafnodes: { + remotes:[{url:nats://127.0.0.1:%d, account: A, credentials: %s}, + {url:nats://127.0.0.1:%d, account: SYS, credentials: %s}] +} +%s +` + + test := func(domain string) { + noDomainFix := "" + if domain == _EMPTY_ { + noDomainFix = `default_js_domain:{A:""}` + } + + sysKp, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + sysCreds := newUser(t, sysKp) + defer removeFile(t, sysCreds) + + aKp, aPub := createKey(t) + aClaim := jwt.NewAccountClaims(aPub) + aJwt := encodeClaim(t, aClaim, aPub) + aCreds := newUser(t, aKp) + defer removeFile(t, aCreds) + + confHub := createConfFile(t, []byte(fmt.Sprintf(tmplHub, -1, ojwt, syspub, syspub, sysJwt, aPub, aJwt, -1, ""))) + defer removeFile(t, confHub) + sHub, _ := RunServerWithConfig(confHub) + defer sHub.Shutdown() + + sdLeaf := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sdLeaf) + confL := createConfFile(t, []byte(fmt.Sprintf(tmplL, + domain, + sdLeaf, + sHub.opts.LeafNode.Port, + aCreds, + sHub.opts.LeafNode.Port, + sysCreds, + noDomainFix))) + defer removeFile(t, confL) + sLeaf, _ := RunServerWithConfig(confL) + defer sLeaf.Shutdown() + + checkLeafNodeConnectedCount(t, sHub, 2) + checkLeafNodeConnectedCount(t, sLeaf, 2) + + ncA := natsConnect(t, fmt.Sprintf("nats://127.0.0.1:%d", sHub.opts.Port), createUserCreds(t, nil, aKp)) + defer ncA.Close() + jsA, err := ncA.JetStream() + require_NoError(t, err) + + _, err = jsA.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1, Subjects: []string{"foo"}}) + require_True(t, err == nats.ErrNoResponders) + + // Add in default domain and restart server + require_NoError(t, ioutil.WriteFile(confHub, []byte(fmt.Sprintf(tmplHub, + sHub.opts.Port, ojwt, syspub, syspub, sysJwt, aPub, aJwt, sHub.opts.LeafNode.Port, + fmt.Sprintf(`default_js_domain: {%s:"%s"}`, aPub, domain))), 0664)) + + sHub.Shutdown() + sHub.WaitForShutdown() + checkLeafNodeConnectedCount(t, sLeaf, 0) + sHubUpd1, _ := RunServerWithConfig(confHub) + defer sHubUpd1.Shutdown() + + checkLeafNodeConnectedCount(t, sHubUpd1, 2) + checkLeafNodeConnectedCount(t, sLeaf, 2) + + _, err = jsA.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 1, Subjects: []string{"bar"}}) + require_NoError(t, err) + } + t.Run("with-domain", func(t *testing.T) { + test("domain") + }) + t.Run("no-domain", func(t *testing.T) { + test("") + }) +} + +func TestLeafNodeJetStreamDefaultDomainClusterBothEnds(t *testing.T) { + // test to ensure that default domain functions when both ends of the leaf node connection are clusters + tmplHub1 := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, + B:{ jetstream: enabled, users:[ {user:b1,password:b1}]} +} +jetstream : { domain: "DHUB", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +server_name: HUB1 +cluster: { + name: HUB + listen: 127.0.0.1:50554 + routes=[nats-route://127.0.0.1:50555] +} +leafnodes: { + listen:127.0.0.1:-1 +} +` + + tmplHub2 := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, + B:{ jetstream: enabled, users:[ {user:b1,password:b1}]} +} +jetstream : { domain: "DHUB", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +server_name: HUB2 +cluster: { + name: HUB + listen: 127.0.0.1:50555 + routes=[nats-route://127.0.0.1:50554] +} +leafnodes: { + listen:127.0.0.1:-1 +} +` + + tmplL1 := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, + B:{ jetstream: disabled, users:[ {user:b1,password:b1}]} +} +jetstream: { domain: "DLEAF", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +server_name: LEAF1 +cluster: { + name: LEAF + listen: 127.0.0.1:50556 + routes=[nats-route://127.0.0.1:50557] +} +leafnodes: { + remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},{url:nats://b1:b1@127.0.0.1:%d, account: B}] +} +default_js_domain: {B:"DHUB"} +` + + tmplL2 := ` +listen: 127.0.0.1:-1 +accounts :{ + A:{ jetstream: enabled, users:[ {user:a1,password:a1}]}, + B:{ jetstream: disabled, users:[ {user:b1,password:b1}]} +} +jetstream: { domain: "DLEAF", store_dir: "%s", max_mem: 100Mb, max_file: 100Mb } +server_name: LEAF2 +cluster: { + name: LEAF + listen: 127.0.0.1:50557 + routes=[nats-route://127.0.0.1:50556] +} +leafnodes: { + remotes:[{url:nats://a1:a1@127.0.0.1:%d, account: A},{url:nats://b1:b1@127.0.0.1:%d, account: B}] +} +default_js_domain: {B:"DHUB"} +` + + sd1 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd1) + confHub1 := createConfFile(t, []byte(fmt.Sprintf(tmplHub1, sd1))) + defer removeFile(t, confHub1) + sHub1, _ := RunServerWithConfig(confHub1) + defer sHub1.Shutdown() + + sd2 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd2) + confHub2 := createConfFile(t, []byte(fmt.Sprintf(tmplHub2, sd2))) + defer removeFile(t, confHub2) + sHub2, _ := RunServerWithConfig(confHub2) + defer sHub2.Shutdown() + + checkClusterFormed(t, sHub1, sHub2) + c1 := cluster{t: t, servers: []*Server{sHub1, sHub2}} + c1.waitOnPeerCount(2) + + sd3 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd3) + confLeaf1 := createConfFile(t, []byte(fmt.Sprintf(tmplL1, sd3, sHub1.getOpts().LeafNode.Port, sHub1.getOpts().LeafNode.Port))) + defer removeFile(t, confLeaf1) + sLeaf1, _ := RunServerWithConfig(confLeaf1) + defer sLeaf1.Shutdown() + + sd4 := createDir(t, JetStreamStoreDir) + defer os.RemoveAll(sd4) + confLeaf2 := createConfFile(t, []byte(fmt.Sprintf(tmplL2, sd3, sHub1.getOpts().LeafNode.Port, sHub1.getOpts().LeafNode.Port))) + defer removeFile(t, confLeaf2) + sLeaf2, _ := RunServerWithConfig(confLeaf2) + defer sLeaf2.Shutdown() + + checkClusterFormed(t, sLeaf1, sLeaf2) + c2 := cluster{t: t, servers: []*Server{sLeaf1, sLeaf2}} + c2.waitOnPeerCount(2) + + checkLeafNodeConnectedCount(t, sHub1, 4) + checkLeafNodeConnectedCount(t, sLeaf1, 2) + checkLeafNodeConnectedCount(t, sLeaf2, 2) + + ncB := natsConnect(t, fmt.Sprintf("nats://b1:b1@127.0.0.1:%d", sLeaf1.getOpts().Port)) + defer ncB.Close() + jsB1, err := ncB.JetStream() + require_NoError(t, err) + si, err := jsB1.AddStream(&nats.StreamConfig{Name: "foo", Replicas: 1, Subjects: []string{"foo"}}) + require_NoError(t, err) + require_Equal(t, si.Cluster.Name, "HUB") + + jsB2, err := ncB.JetStream(nats.Domain("DHUB")) + require_NoError(t, err) + si, err = jsB2.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 1, Subjects: []string{"bar"}}) + require_NoError(t, err) + require_Equal(t, si.Cluster.Name, "HUB") + +} diff --git a/server/mqtt.go b/server/mqtt.go index 94d4d669..63118893 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -246,14 +246,16 @@ type sessPersistRecord struct { } type mqttJSA struct { - mu sync.Mutex - id string - c *client - sendq chan *mqttJSPubMsg - rplyr string - replies sync.Map - nuid *nuid.NUID - quitCh chan struct{} + mu sync.Mutex + id string + c *client + sendq chan *mqttJSPubMsg + rplyr string + replies sync.Map + nuid *nuid.NUID + quitCh chan struct{} + domain string // Domain or possibly empty. This is added to session subject. + domainSet bool // covers if domain was set, even to empty } type mqttJSPubMsg struct { @@ -931,7 +933,6 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc id := string(getHash(s.Name())) replicas := s.mqttDetermineReplicas() - s.Noticef("Creating MQTT streams/consumers with replicas %v for account %q", replicas, accName) as := &mqttAccountSessionManager{ sessions: make(map[string]*mqttSession), sessByHash: make(map[string]*mqttSession), @@ -950,10 +951,39 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc ch: make(chan struct{}, 1), }, } + // TODO record domain name in as here + + // The domain to communicate with may be required for JS calls. + // Search from specific (per account setting) to generic (mqtt setting) + opts := s.getOpts() + if opts.JsAccDefaultDomain != nil { + if d, ok := opts.JsAccDefaultDomain[accName]; ok { + if d != _EMPTY_ { + as.jsa.domain = d + } + as.jsa.domainSet = true + } + // in case domain was set to empty, check if there are more generic domain overwrites + } + if as.jsa.domain == _EMPTY_ { + if d := opts.MQTT.JsDomain; d != _EMPTY_ { + as.jsa.domain = d + as.jsa.domainSet = true + } + } // We need to include the domain in the subject prefix used to store sessions in the $MQTT_sess stream. - if d := s.getOpts().JetStreamDomain; d != _EMPTY_ { + if as.jsa.domainSet { + if as.jsa.domain != _EMPTY_ { + as.domainTk = as.jsa.domain + "." + } + } else if d := s.getOpts().JetStreamDomain; d != _EMPTY_ { as.domainTk = d + "." } + if as.jsa.domainSet { + s.Noticef("Creating MQTT streams/consumers with replicas %v for account %q in domain %q", replicas, accName, as.jsa.domain) + } else { + s.Noticef("Creating MQTT streams/consumers with replicas %v for account %q", replicas, accName) + } var subs []*subscription var success bool @@ -1156,6 +1186,16 @@ func (jsa *mqttJSA) newRequest(kind, subject string, hdr int, msg []byte) (inter return jsa.newRequestEx(kind, subject, hdr, msg, mqttJSAPITimeout) } +func (jsa *mqttJSA) prefixDomain(subject string) string { + if jsa.domain != _EMPTY_ { + // rewrite js api prefix with domain + if sub := strings.TrimPrefix(subject, JSApiPrefix+"."); sub != subject { + subject = fmt.Sprintf("$JS.%s.API.%s", jsa.domain, sub) + } + } + return subject +} + func (jsa *mqttJSA) newRequestEx(kind, subject string, hdr int, msg []byte, timeout time.Duration) (interface{}, error) { jsa.mu.Lock() // Either we use nuid.Next() which uses a global lock, or our own nuid object, but @@ -1172,6 +1212,7 @@ func (jsa *mqttJSA) newRequestEx(kind, subject string, hdr int, msg []byte, time ch := make(chan interface{}, 1) jsa.replies.Store(reply, ch) + subject = jsa.prefixDomain(subject) jsa.sendq <- &mqttJSPubMsg{ subj: subject, reply: reply, @@ -1300,14 +1341,15 @@ func (jsa *mqttJSA) storeMsgWithKind(kind, subject string, headers int, msg []by func (jsa *mqttJSA) deleteMsg(stream string, seq uint64, wait bool) error { dreq := JSApiMsgDeleteRequest{Seq: seq, NoErase: true} req, _ := json.Marshal(dreq) + subj := jsa.prefixDomain(fmt.Sprintf(JSApiMsgDeleteT, stream)) if !wait { jsa.sendq <- &mqttJSPubMsg{ - subj: fmt.Sprintf(JSApiMsgDeleteT, stream), + subj: subj, msg: req, } return nil } - dmi, err := jsa.newRequest(mqttJSAMsgDelete, fmt.Sprintf(JSApiMsgDeleteT, stream), 0, req) + dmi, err := jsa.newRequest(mqttJSAMsgDelete, subj, 0, req) if err != nil { return err } @@ -1609,7 +1651,8 @@ func (as *mqttAccountSessionManager) createSubscription(subject string, cb msgHa // No lock held on entry. func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, accName string, closeCh chan struct{}) { var cluster string - if s.JetStreamEnabled() { + if s.JetStreamEnabled() && !as.jsa.domainSet { + // Only request the own cluster when it is clear that cluster = s.cachedClusterName() } as.mu.RLock() @@ -2227,7 +2270,7 @@ func (sess *mqttSession) clear() error { sess.mu.Unlock() for _, dur := range durs { - sess.jsa.sendq <- &mqttJSPubMsg{subj: fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur)} + sess.jsa.sendq <- &mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, dur))} } if seq > 0 { if err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true); err != nil { @@ -2380,7 +2423,7 @@ func (sess *mqttSession) deleteConsumer(cc *ConsumerConfig) { sess.mu.Lock() sess.tmaxack -= cc.MaxAckPending sess.mu.Unlock() - sess.jsa.sendq <- &mqttJSPubMsg{subj: fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable)} + sess.jsa.sendq <- &mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))} } ////////////////////////////////////////////////////////////////////////////// diff --git a/server/mqtt_test.go b/server/mqtt_test.go index e4b8b98f..e39cf0af 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -3130,88 +3130,149 @@ func TestMQTTClusterPlacement(t *testing.T) { } } -func TestMQTTLeafnodeWithoutJSToClusterWithJS(t *testing.T) { - getClusterOpts := func(name string, i int) *Options { - o := testMQTTDefaultOptions() - o.ServerName = name - o.Cluster.Name = "hub" - o.Cluster.Host = "127.0.0.1" - o.Cluster.Port = 2790 + i - o.Routes = RoutesFromStr("nats://127.0.0.1:2791,nats://127.0.0.1:2792,nats://127.0.0.1:2793") - o.LeafNode.Host = "127.0.0.1" - o.LeafNode.Port = -1 - return o - } - o1 := getClusterOpts("S1", 1) - s1 := testMQTTRunServer(t, o1) - defer testMQTTShutdownServer(s1) +func TestMQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc(t *testing.T) { + test := func(resolution int) { + getClusterOpts := func(name string, i int) *Options { + o := testMQTTDefaultOptions() + o.ServerName = name + o.Cluster.Name = "hub" + // first two test cases rely on domain not being set in hub + if resolution > 1 { + o.JetStreamDomain = "DOMAIN" + } + o.Cluster.Host = "127.0.0.1" + o.Cluster.Port = 2790 + i + o.Routes = RoutesFromStr("nats://127.0.0.1:2791,nats://127.0.0.1:2792,nats://127.0.0.1:2793") + o.LeafNode.Host = "127.0.0.1" + o.LeafNode.Port = -1 + return o + } + o1 := getClusterOpts("S1", 1) + s1 := testMQTTRunServer(t, o1) + defer testMQTTShutdownServer(s1) - o2 := getClusterOpts("S2", 2) - s2 := testMQTTRunServer(t, o2) - defer testMQTTShutdownServer(s2) + o2 := getClusterOpts("S2", 2) + s2 := testMQTTRunServer(t, o2) + defer testMQTTShutdownServer(s2) - o3 := getClusterOpts("S3", 3) - s3 := testMQTTRunServer(t, o3) - defer testMQTTShutdownServer(s3) + o3 := getClusterOpts("S3", 3) + s3 := testMQTTRunServer(t, o3) + defer testMQTTShutdownServer(s3) - cluster := []*Server{s1, s2, s3} - checkClusterFormed(t, cluster...) - checkFor(t, 10*time.Second, 50*time.Millisecond, func() error { - for _, s := range cluster { - if s.JetStreamIsLeader() { - return nil + cluster := []*Server{s1, s2, s3} + checkClusterFormed(t, cluster...) + checkFor(t, 10*time.Second, 50*time.Millisecond, func() error { + for _, s := range cluster { + if s.JetStreamIsLeader() { + return nil + } + } + return fmt.Errorf("no leader yet") + }) + + // Now define a leafnode that has mqtt enabled, but no JS. This should still work. + lno := testMQTTDefaultOptions() + // Make sure jetstream is not explicitly defined here. + lno.JetStream = false + switch resolution { + case 0: + // turn off jetstream in $G by adding another account and set mqtt domain option and set account default + lno.Accounts = append(lno.Accounts, NewAccount("unused-account")) + fallthrough + case 1: + lno.JsAccDefaultDomain = map[string]string{ + "$G": "", + } + case 2: + lno.JsAccDefaultDomain = map[string]string{ + "$G": o1.JetStreamDomain, + } + case 3: + // turn off jetstream in $G by adding another account and set mqtt domain option + lno.Accounts = append(lno.Accounts, NewAccount("unused-account")) + fallthrough + case 4: + // actual solution + lno.MQTT.JsDomain = o1.JetStreamDomain + case 5: + // set per account overwrite and disable js in $G + lno.Accounts = append(lno.Accounts, NewAccount("unused-account")) + lno.JsAccDefaultDomain = map[string]string{ + "$G": o1.JetStreamDomain, } } - return fmt.Errorf("no leader yet") - }) + // Whenever an account was added to disable JS in $G, enable it in the server + if len(lno.Accounts) > 0 { + lno.JetStream = true + lno.JetStreamDomain = "OTHER" + lno.StoreDir = createDir(t, "mqtt_js_ln") + } - // Now define a leafnode that has mqtt enabled, but no JS. This should still work. - lno := testMQTTDefaultOptions() - // Make sure jetstream is not explicitly defined here. - lno.JetStream = false - // Use RoutesFromStr() to make an array of urls - urls := RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d,nats://127.0.0.1:%d,nats://127.0.0.1:%d", - o1.LeafNode.Port, o2.LeafNode.Port, o3.LeafNode.Port)) - lno.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} - ln := RunServer(lno) - defer ln.Shutdown() + // Use RoutesFromStr() to make an array of urls + urls := RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d,nats://127.0.0.1:%d,nats://127.0.0.1:%d", + o1.LeafNode.Port, o2.LeafNode.Port, o3.LeafNode.Port)) + lno.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} - // Now connect to leafnode and subscribe - mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: true}, lno.MQTT.Host, lno.MQTT.Port) - defer mc.Close() - testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) - testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) - testMQTTFlush(t, mc, nil, rc) + ln := RunServer(lno) + defer testMQTTShutdownServer(ln) - connectAndPublish := func(o *Options) { - mp, rp := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) - defer mp.Close() - testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false) + // Now connect to leafnode and subscribe + mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: true}, lno.MQTT.Host, lno.MQTT.Port) + defer mc.Close() + testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc, rc, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc, nil, rc) - testMQTTPublish(t, mp, rp, 1, false, false, "foo", 1, []byte("msg")) + connectAndPublish := func(o *Options) { + mp, rp := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mp.Close() + testMQTTCheckConnAck(t, rp, mqttConnAckRCConnectionAccepted, false) + + testMQTTPublish(t, mp, rp, 1, false, false, "foo", 1, []byte("msg")) + } + // Connect a publisher from leafnode and publish, verify message is received. + connectAndPublish(lno) + testMQTTCheckPubMsg(t, mc, rc, "foo", mqttPubQos1, []byte("msg")) + + // Connect from one server in the cluster check it works from there too. + connectAndPublish(o3) + testMQTTCheckPubMsg(t, mc, rc, "foo", mqttPubQos1, []byte("msg")) + + // Connect from a server in the hub and subscribe + mc2, rc2 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub2", cleanSess: true}, o2.MQTT.Host, o2.MQTT.Port) + defer mc2.Close() + testMQTTCheckConnAck(t, rc2, mqttConnAckRCConnectionAccepted, false) + testMQTTSub(t, 1, mc2, rc2, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) + testMQTTFlush(t, mc2, nil, rc2) + + // Connect a publisher from leafnode and publish, verify message is received. + connectAndPublish(lno) + testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) + + // Connect from one server in the cluster check it works from there too. + connectAndPublish(o1) + testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) } - // Connect a publisher from leafnode and publish, verify message is received. - connectAndPublish(lno) - testMQTTCheckPubMsg(t, mc, rc, "foo", mqttPubQos1, []byte("msg")) - - // Connect from one server in the cluster check it works from there too. - connectAndPublish(o3) - testMQTTCheckPubMsg(t, mc, rc, "foo", mqttPubQos1, []byte("msg")) - - // Connect from a server in the hub and subscribe - mc2, rc2 := testMQTTConnect(t, &mqttConnInfo{clientID: "sub2", cleanSess: true}, o2.MQTT.Host, o2.MQTT.Port) - defer mc2.Close() - testMQTTCheckConnAck(t, rc2, mqttConnAckRCConnectionAccepted, false) - testMQTTSub(t, 1, mc2, rc2, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1}) - testMQTTFlush(t, mc2, nil, rc2) - - // Connect a publisher from leafnode and publish, verify message is received. - connectAndPublish(lno) - testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) - - // Connect from one server in the cluster check it works from there too. - connectAndPublish(o1) - testMQTTCheckPubMsg(t, mc2, rc2, "foo", mqttPubQos1, []byte("msg")) + t.Run("backwards-compatibility-default-js-enabled-in-leaf", func(t *testing.T) { + test(0) // test with JsAccDefaultDomain set to default (pointing at hub) but jetstream enabled in leaf node too + }) + t.Run("backwards-compatibility-default-js-disabled-in-leaf", func(t *testing.T) { + // test with JsAccDefaultDomain set. Checks if it works with backwards compatibility code for empty domain + test(1) + }) + t.Run("backwards-compatibility-domain-js-disabled-in-leaf", func(t *testing.T) { + // test with JsAccDefaultDomain set. Checks if it works with backwards compatibility code for domain set + test(2) // test with domain set in mqtt client + }) + t.Run("mqtt-explicit-js-enabled-in-leaf", func(t *testing.T) { + test(3) // test with domain set in mqtt client (pointing at hub) but jetstream enabled in leaf node too + }) + t.Run("mqtt-explicit-js-disabled-in-leaf", func(t *testing.T) { + test(4) // test with domain set in mqtt client + }) + t.Run("backwards-compatibility-domain-js-enabled-in-leaf", func(t *testing.T) { + test(5) // test with JsAccDefaultDomain set to domain (pointing at hub) but jetstream enabled in leaf node too + }) } func TestMQTTImportExport(t *testing.T) { diff --git a/server/opts.go b/server/opts.go index 0d82f14d..4489af23 100644 --- a/server/opts.go +++ b/server/opts.go @@ -181,77 +181,79 @@ type RemoteLeafOpts struct { // NOTE: This structure is no longer used for monitoring endpoints // and json tags are deprecated and may be removed in the future. type Options struct { - ConfigFile string `json:"-"` - ServerName string `json:"server_name"` - Host string `json:"addr"` - Port int `json:"port"` - ClientAdvertise string `json:"-"` - Trace bool `json:"-"` - Debug bool `json:"-"` - TraceVerbose bool `json:"-"` - NoLog bool `json:"-"` - NoSigs bool `json:"-"` - NoSublistCache bool `json:"-"` - NoHeaderSupport bool `json:"-"` - DisableShortFirstPing bool `json:"-"` - Logtime bool `json:"-"` - MaxConn int `json:"max_connections"` - MaxSubs int `json:"max_subscriptions,omitempty"` - MaxSubTokens uint8 `json:"-"` - Nkeys []*NkeyUser `json:"-"` - Users []*User `json:"-"` - Accounts []*Account `json:"-"` - NoAuthUser string `json:"-"` - SystemAccount string `json:"-"` - NoSystemAccount bool `json:"-"` - AllowNewAccounts bool `json:"-"` - Username string `json:"-"` - Password string `json:"-"` - Authorization string `json:"-"` - PingInterval time.Duration `json:"ping_interval"` - MaxPingsOut int `json:"ping_max"` - HTTPHost string `json:"http_host"` - HTTPPort int `json:"http_port"` - HTTPBasePath string `json:"http_base_path"` - HTTPSPort int `json:"https_port"` - AuthTimeout float64 `json:"auth_timeout"` - MaxControlLine int32 `json:"max_control_line"` - MaxPayload int32 `json:"max_payload"` - MaxPending int64 `json:"max_pending"` - Cluster ClusterOpts `json:"cluster,omitempty"` - Gateway GatewayOpts `json:"gateway,omitempty"` - LeafNode LeafNodeOpts `json:"leaf,omitempty"` - JetStream bool `json:"jetstream"` - JetStreamMaxMemory int64 `json:"-"` - JetStreamMaxStore int64 `json:"-"` - JetStreamDomain string `json:"-"` - JetStreamKey string `json:"-"` - StoreDir string `json:"-"` - Websocket WebsocketOpts `json:"-"` - MQTT MQTTOpts `json:"-"` - ProfPort int `json:"-"` - PidFile string `json:"-"` - PortsFileDir string `json:"-"` - LogFile string `json:"-"` - LogSizeLimit int64 `json:"-"` - Syslog bool `json:"-"` - RemoteSyslog string `json:"-"` - Routes []*url.URL `json:"-"` - RoutesStr string `json:"-"` - TLSTimeout float64 `json:"tls_timeout"` - TLS bool `json:"-"` - TLSVerify bool `json:"-"` - TLSMap bool `json:"-"` - TLSCert string `json:"-"` - TLSKey string `json:"-"` - TLSCaCert string `json:"-"` - TLSConfig *tls.Config `json:"-"` - TLSPinnedCerts PinnedCertSet `json:"-"` - AllowNonTLS bool `json:"-"` - WriteDeadline time.Duration `json:"-"` - MaxClosedClients int `json:"-"` - LameDuckDuration time.Duration `json:"-"` - LameDuckGracePeriod time.Duration `json:"-"` + ConfigFile string `json:"-"` + ServerName string `json:"server_name"` + Host string `json:"addr"` + Port int `json:"port"` + ClientAdvertise string `json:"-"` + Trace bool `json:"-"` + Debug bool `json:"-"` + TraceVerbose bool `json:"-"` + NoLog bool `json:"-"` + NoSigs bool `json:"-"` + NoSublistCache bool `json:"-"` + NoHeaderSupport bool `json:"-"` + DisableShortFirstPing bool `json:"-"` + Logtime bool `json:"-"` + MaxConn int `json:"max_connections"` + MaxSubs int `json:"max_subscriptions,omitempty"` + MaxSubTokens uint8 `json:"-"` + Nkeys []*NkeyUser `json:"-"` + Users []*User `json:"-"` + Accounts []*Account `json:"-"` + NoAuthUser string `json:"-"` + SystemAccount string `json:"-"` + NoSystemAccount bool `json:"-"` + AllowNewAccounts bool `json:"-"` + Username string `json:"-"` + Password string `json:"-"` + Authorization string `json:"-"` + PingInterval time.Duration `json:"ping_interval"` + MaxPingsOut int `json:"ping_max"` + HTTPHost string `json:"http_host"` + HTTPPort int `json:"http_port"` + HTTPBasePath string `json:"http_base_path"` + HTTPSPort int `json:"https_port"` + AuthTimeout float64 `json:"auth_timeout"` + MaxControlLine int32 `json:"max_control_line"` + MaxPayload int32 `json:"max_payload"` + MaxPending int64 `json:"max_pending"` + Cluster ClusterOpts `json:"cluster,omitempty"` + Gateway GatewayOpts `json:"gateway,omitempty"` + LeafNode LeafNodeOpts `json:"leaf,omitempty"` + JetStream bool `json:"jetstream"` + JetStreamMaxMemory int64 `json:"-"` + JetStreamMaxStore int64 `json:"-"` + JetStreamDomain string `json:"-"` + JetStreamExtHint string `json:"-"` + JetStreamKey string `json:"-"` + StoreDir string `json:"-"` + JsAccDefaultDomain map[string]string `json:"-"` // account to domain name mapping + Websocket WebsocketOpts `json:"-"` + MQTT MQTTOpts `json:"-"` + ProfPort int `json:"-"` + PidFile string `json:"-"` + PortsFileDir string `json:"-"` + LogFile string `json:"-"` + LogSizeLimit int64 `json:"-"` + Syslog bool `json:"-"` + RemoteSyslog string `json:"-"` + Routes []*url.URL `json:"-"` + RoutesStr string `json:"-"` + TLSTimeout float64 `json:"tls_timeout"` + TLS bool `json:"-"` + TLSVerify bool `json:"-"` + TLSMap bool `json:"-"` + TLSCert string `json:"-"` + TLSKey string `json:"-"` + TLSCaCert string `json:"-"` + TLSConfig *tls.Config `json:"-"` + TLSPinnedCerts PinnedCertSet `json:"-"` + AllowNonTLS bool `json:"-"` + WriteDeadline time.Duration `json:"-"` + MaxClosedClients int `json:"-"` + LameDuckDuration time.Duration `json:"-"` + LameDuckGracePeriod time.Duration `json:"-"` // MaxTracedMsgLen is the maximum printable length for traced messages. MaxTracedMsgLen int `json:"-"` @@ -390,6 +392,9 @@ type MQTTOpts struct { Password string Token string + // JetStream domain mqtt is supposed to pick up + JsDomain string + // Timeout for the authentication process. AuthTimeout float64 @@ -1271,6 +1276,18 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error *errors = append(*errors, err) return } + case "default_js_domain": + vv, ok := v.(map[string]interface{}) + if !ok { + *errors = append(*errors, &configErr{tk, fmt.Sprintf("error default_js_domain config: unsupported type %T", v)}) + return + } + m := make(map[string]string) + for kk, kv := range vv { + _, v = unwrapValue(kv, &tk) + m[kk] = v.(string) + } + o.JsAccDefaultDomain = m default: if au := atomic.LoadInt32(&allowUnknownTopLevelField); au == 0 && !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -1699,6 +1716,8 @@ func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]e doEnable = mv.(bool) case "key", "ek", "encryption_key": opts.JetStreamKey = mv.(string) + case "extension_hint": + opts.JetStreamExtHint = mv.(string) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -3887,6 +3906,8 @@ func parseMQTT(v interface{}, o *Options, errors *[]error, warnings *[]error) er } else { o.MQTT.MaxAckPending = uint16(tmp) } + case "js_domain": + o.MQTT.JsDomain = mv.(string) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/raft.go b/server/raft.go index cb33b106..03b21a26 100644 --- a/server/raft.go +++ b/server/raft.go @@ -155,6 +155,7 @@ type raft struct { dflag bool pleader bool observer bool + extSt extensionState // Subjects for votes, updates, replays. psubj string @@ -325,7 +326,7 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer tmpfile.Close() os.Remove(tmpfile.Name()) - return writePeerState(cfg.Store, &peerState{knownPeers, expected}) + return writePeerState(cfg.Store, &peerState{knownPeers, expected, extUndetermined}) } // startRaftNode will start the raft node. @@ -382,6 +383,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) { leadc: make(chan bool, 8), stepdown: make(chan string, 8), observer: cfg.Observer, + extSt: ps.domainExt, } n.c.registerWithAccount(sacc) @@ -919,7 +921,7 @@ func (n *raft) InstallSnapshot(data []byte) error { snap := &snapshot{ lastTerm: term, lastIndex: n.applied, - peerstate: encodePeerState(&peerState{n.peerNames(), n.csz}), + peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}), data: data, } @@ -1502,6 +1504,13 @@ func (n *raft) isObserver() bool { return n.observer } +func (n *raft) setObserver(isObserver bool, extSt extensionState) { + n.Lock() + defer n.Unlock() + n.observer = isObserver + n.extSt = extSt +} + func (n *raft) runAsFollower() { for { elect := n.electTimer() @@ -2187,7 +2196,7 @@ func (n *raft) applyCommit(index uint64) error { n.qn = n.csz/2 + 1 } } - n.writePeerState(&peerState{n.peerNames(), n.csz}) + n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt}) // We pass these up as well. committed = append(committed, e) case EntryRemovePeer: @@ -2216,7 +2225,7 @@ func (n *raft) applyCommit(index uint64) error { } // Write out our new state. - n.writePeerState(&peerState{n.peerNames(), n.csz}) + n.writePeerState(&peerState{n.peerNames(), n.csz, n.extSt}) // We pass these up as well. committed = append(committed, e) } @@ -2903,13 +2912,22 @@ func (n *raft) sendAppendEntry(entries []*Entry) { n.sendRPC(n.asubj, n.areply, ae.buf) } +type extensionState uint16 + +const ( + extUndetermined = extensionState(iota) + extExtended + extNotExtended +) + type peerState struct { knownPeers []string clusterSize int + domainExt extensionState } func peerStateBufSize(ps *peerState) int { - return 4 + 4 + (8 * len(ps.knownPeers)) + return 4 + 4 + (idLen * len(ps.knownPeers)) + 2 } func encodePeerState(ps *peerState) []byte { @@ -2922,6 +2940,7 @@ func encodePeerState(ps *peerState) []byte { copy(buf[wi:], peer) wi += idLen } + le.PutUint16(buf[wi:], uint16(ps.domainExt)) return buf } @@ -2933,13 +2952,17 @@ func decodePeerState(buf []byte) (*peerState, error) { ps := &peerState{clusterSize: int(le.Uint32(buf[0:]))} expectedPeers := int(le.Uint32(buf[4:])) buf = buf[8:] - for i, ri, n := 0, 0, expectedPeers; i < n && ri < len(buf); i++ { + ri := 0 + for i, n := 0, expectedPeers; i < n && ri < len(buf); i++ { ps.knownPeers = append(ps.knownPeers, string(buf[ri:ri+idLen])) ri += idLen } if len(ps.knownPeers) != expectedPeers { return nil, errCorruptPeers } + if len(buf[ri:]) >= 2 { + ps.domainExt = extensionState(le.Uint16(buf[ri:])) + } return ps, nil } @@ -2954,7 +2977,7 @@ func (n *raft) peerNames() []string { func (n *raft) currentPeerState() *peerState { n.RLock() - ps := &peerState{n.peerNames(), n.csz} + ps := &peerState{n.peerNames(), n.csz, n.extSt} n.RUnlock() return ps } diff --git a/server/reload.go b/server/reload.go index 52d749a3..769c158e 100644 --- a/server/reload.go +++ b/server/reload.go @@ -885,7 +885,7 @@ func imposeOrder(value interface{}) error { sort.Strings(value.AllowedOrigins) case string, bool, uint8, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet, *URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList, - *OCSPConfig: + *OCSPConfig, map[string]string: // explicitly skipped types default: // this will fail during unit tests diff --git a/server/server.go b/server/server.go index 8537206f..c3f7ff49 100644 --- a/server/server.go +++ b/server/server.go @@ -1319,7 +1319,24 @@ func (s *Server) registerAccountNoLock(acc *Account) *Account { } acc.srv = s acc.updated = time.Now().UTC() + accName := acc.Name + jsEnabled := acc.jsLimits != nil acc.mu.Unlock() + + if opts := s.getOpts(); opts != nil && len(opts.JsAccDefaultDomain) > 0 { + if defDomain, ok := opts.JsAccDefaultDomain[accName]; ok { + if jsEnabled { + s.Warnf("Skipping Default Domain %q, set for JetStream enabled account %q", defDomain, accName) + } else if defDomain != _EMPTY_ { + dest := fmt.Sprintf(jsDomainAPI, defDomain) + s.Noticef("Adding default domain mapping %q -> %q to account %q %p", jsAllAPI, dest, accName, acc) + if err := acc.AddMapping(jsAllAPI, dest); err != nil { + s.Errorf("Error adding JetStream default domain mapping: %v", err) + } + } + } + } + s.accounts.Store(acc.Name, acc) s.tmpAccounts.Delete(acc.Name) s.enableAccountTracking(acc) @@ -1568,6 +1585,10 @@ func (s *Server) Start() { friendlyBytes(int64(MAX_PAYLOAD_MAX_SIZE))) } + if len(opts.JsAccDefaultDomain) > 0 { + s.Warnf("The option `default_js_domain` is a temporary backwards compatibility measure and will be removed") + } + // If we have a memory resolver, check the accounts here for validation exceptions. // This allows them to be logged right away vs when they are accessed via a client. if hasOperators && len(opts.resolverPreloads) > 0 { diff --git a/server/server_test.go b/server/server_test.go index fe390dce..630a305c 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -97,7 +97,7 @@ func LoadConfig(configFile string) (opts *Options) { if err != nil { panic(fmt.Sprintf("Error processing configuration file: %v", err)) } - opts.NoSigs, opts.NoLog = true, true + opts.NoSigs, opts.NoLog = true, opts.LogFile == _EMPTY_ return } diff --git a/server/sublist_test.go b/server/sublist_test.go index c6912f06..b8d3b8a2 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -223,6 +223,10 @@ func testSublistFullWildcard(t *testing.T, s *Sublist) { verifyLen(r.psubs, 2, t) verifyMember(r.psubs, lsub, t) verifyMember(r.psubs, fsub, t) + + r = s.Match("a.>") + verifyLen(r.psubs, 1, t) + verifyMember(r.psubs, fsub, t) } func TestSublistRemove(t *testing.T) {