Changes to leafnodes to support multiple domains where the hub is JetStream enabled but the hub account is not, and the leafnode is.

We were incorrectly shutting things down via deny clauses when detecting the remote side/hub had JetStream capabilities.
This change moves that logic to the remote side and is signalled off the connect message which let's the remote side know
if the local leafnode account has JetStream enabled.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-06-06 12:15:36 -07:00
parent 82eb6b6d5e
commit 30fae4f960
4 changed files with 154 additions and 88 deletions

View File

@@ -1273,6 +1273,9 @@ func (js *jetStream) disableJetStream(jsa *jsAccount) error {
// jetStreamConfigured reports whether the account has JetStream configured, regardless of this
// servers JetStream status.
func (a *Account) jetStreamConfigured() bool {
if a == nil {
return false
}
a.mu.RLock()
defer a.mu.RUnlock()
return a.jsLimits != nil

View File

@@ -830,7 +830,7 @@ const badAPIRequestT = "Malformed JetStream API Request: %q"
func (a *Account) checkJetStream() (enabled, shouldError bool) {
a.mu.RLock()
defer a.mu.RUnlock()
return a.js != nil, a.nleafs == 0
return a.js != nil, a.nleafs+a.nrleafs == 0
}
// Request for current usage and limits for this account.
@@ -876,6 +876,7 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, subject, rep
if err != nil {
return
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), string(b))
}

View File

@@ -6460,6 +6460,34 @@ func TestJetStreamClusterDomainsAndSameNameSources(t *testing.T) {
}
}
// When a leafnode enables JS on an account that is not enabled on the remote cluster account this should
// still work. Early NGS beta testers etc.
func TestJetStreamClusterSingleLeafNodeEnablingJetStream(t *testing.T) {
tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: HUB, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "HUB", _EMPTY_, 3, 11322, true)
defer c.shutdown()
ln := c.createSingleLeafNodeNoSystemAccountAndEnablesJetStream()
defer ln.Shutdown()
// Check that we have JS in the $G account on the leafnode.
nc, js := jsClientConnect(t, ln)
defer nc.Close()
if _, err := js.AccountInfo(); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Connect our client to the "nojs" account in the cluster but make sure JS works since its enabled via the leafnode.
s := c.randomServer()
nc, js = jsClientConnect(t, s, nats.UserInfo("nojs", "p"))
defer nc.Close()
if _, err := js.AccountInfo(); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
func TestJetStreamClusterLeafDifferentAccounts(t *testing.T) {
c := createJetStreamCluster(t, jsClusterAccountsTempl, "HUB", _EMPTY_, 2, 33133, false)
defer c.shutdown()
@@ -7101,6 +7129,7 @@ var jsClusterAccountsTempl = `
accounts {
ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled }
TWO { users = [ { user: "two", pass: "p" } ]; jetstream: enabled }
NOJS { users = [ { user: "nojs", pass: "p" } ] }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`
@@ -7508,6 +7537,29 @@ func (c *cluster) createSingleLeafNodeNoSystemAccount() *Server {
return s
}
// This is tied to jsClusterAccountsTempl, so changes there to users needs to be reflected here.
func (c *cluster) createSingleLeafNodeNoSystemAccountAndEnablesJetStream() *Server {
as := c.randomServer()
lno := as.getOpts().LeafNode
ln := fmt.Sprintf("nats://nojs:p@%s:%d", lno.Host, lno.Port)
conf := fmt.Sprintf(jsClusterSingleLeafNodeLikeNGSTempl, createDir(c.t, JetStreamStoreDir), ln)
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
c.servers = append(c.servers, s)
c.opts = append(c.opts, o)
checkLeafNodeConnectedCount(c.t, as, 1)
return s
}
var jsClusterSingleLeafNodeLikeNGSTempl = `
listen: 127.0.0.1:-1
server_name: LNJS
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}
leaf { remotes [ { urls: [ %s ] } ] }
`
var jsClusterSingleLeafNodeTempl = `
listen: 127.0.0.1:-1
server_name: LNJS

View File

@@ -112,43 +112,54 @@ func (c *client) isHubLeafNode() bool {
// are sharing the system account and wanting to extend the JS domain.
// r lock should be held.
func (s *Server) addInJSDeny(r *leafNodeCfg) {
var hasDE, hasDI bool
s.addInJSDenyExport(r)
s.addInJSDenyImport(r)
}
// Will add in the deny export for JetStream on solicited connections if we
// detect we have multiple JetStream domains and we know our local account
// is JetStream enabled.
// r lock should be held.
func (s *Server) addInJSDenyExport(r *leafNodeCfg) {
for _, dsubj := range r.DenyExports {
if dsubj == jsAllAPI {
hasDE = true
break
}
}
for _, dsubj := range r.DenyImports {
if dsubj == jsAllAPI {
hasDI = true
break
return
}
}
var addedDeny bool
if !hasDE {
s.Noticef("Adding deny export of %q for leafnode configuration on %q that bridges system account", jsAllAPI, r.LocalAccount)
r.DenyExports = append(r.DenyExports, jsAllAPI)
addedDeny = true
}
if !hasDI {
s.Noticef("Adding deny import of %q for leafnode configuration on %q that bridges system account", jsAllAPI, r.LocalAccount)
r.DenyImports = append(r.DenyImports, jsAllAPI)
addedDeny = true
}
s.Noticef("Adding deny export of %q for leafnode configuration on %q that bridges system account", jsAllAPI, r.LocalAccount)
r.DenyExports = append(r.DenyExports, jsAllAPI)
// We added in some deny clauses here so need to regenerate the permissions etc.
if addedDeny {
perms := &Permissions{}
if len(r.DenyExports) > 0 {
perms.Publish = &SubjectPermission{Deny: r.DenyExports}
}
if len(r.DenyImports) > 0 {
perms.Subscribe = &SubjectPermission{Deny: r.DenyImports}
}
r.perms = perms
perms := &Permissions{}
perms.Publish = &SubjectPermission{Deny: r.DenyExports}
if len(r.DenyImports) > 0 {
perms.Subscribe = &SubjectPermission{Deny: r.DenyImports}
}
r.perms = perms
}
// Will add in the deny import for JetStream on solicited connections if we
// detect we have multiple JetStream domains and we know our local account
// is JetStream enabled.
// r lock should be held.
func (s *Server) addInJSDenyImport(r *leafNodeCfg) {
for _, dsubj := range r.DenyImports {
if dsubj == jsAllAPI {
return
}
}
s.Noticef("Adding deny import of %q for leafnode configuration on %q that bridges system account", jsAllAPI, r.LocalAccount)
r.DenyImports = append(r.DenyImports, jsAllAPI)
// We added in some deny clauses here so need to regenerate the permissions etc.
perms := &Permissions{}
perms.Subscribe = &SubjectPermission{Deny: r.DenyImports}
if len(r.DenyExports) > 0 {
perms.Publish = &SubjectPermission{Deny: r.DenyExports}
}
r.perms = perms
}
// Used for $SYS accounts when sharing but using separate JS domains.
@@ -636,17 +647,18 @@ var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-
func (c *client) sendLeafConnect(clusterName string, tlsRequired, headers bool) error {
// We support basic user/pass and operator based user JWT with signatures.
cinfo := leafConnectInfo{
TLS: tlsRequired,
ID: c.srv.info.ID,
Name: c.srv.info.Name,
Hub: c.leaf.remote.Hub,
Cluster: clusterName,
Headers: headers,
DenyPub: c.leaf.remote.DenyImports,
TLS: tlsRequired,
ID: c.srv.info.ID,
Name: c.srv.info.Name,
Hub: c.leaf.remote.Hub,
Cluster: clusterName,
Headers: headers,
JetStream: c.acc.jetStreamConfigured(),
DenyPub: c.leaf.remote.DenyImports,
}
// Check for credentials first, that will take precedence..
if creds := c.leaf.remote.Credentials; creds != "" {
if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
contents, err := ioutil.ReadFile(creds)
if err != nil {
@@ -996,28 +1008,34 @@ func (c *client) processLeafnodeInfo(info *Info) {
// This is so that if JetStream is enabled on both sides we can separately address both.
if remote, acc := c.leaf.remote, c.acc; remote != nil {
remote.Lock()
// JetStream checks for mappings and permissions updates.
hasJSDomain := opts.JetStreamDomain != _EMPTY_
if acc != sysAcc {
if hasSysShared {
s.addInJSDeny(remote)
} else {
// Here we want to suppress if this local account has JS enabled.
// This is regardless of whether or not this server is actually running JS.
// We do consider this if the other side is not running JetStream.
if acc != nil && acc.jetStreamConfigured() && info.JetStream {
s.addInJSDeny(remote)
// We only suppress export. But we do send an indication about our JetStream
// status in the connect and the hub side will suppress as well if the remote
// account also has JetStream enabled.
if acc.jetStreamConfigured() {
s.addInJSDenyExport(remote)
}
}
// If we have a specified JetStream domain we will want to add a mapping to
// allow access cross domain for each non-system account.
if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && acc.jetStreamConfigured() {
if hasJSDomain && acc.jetStreamConfigured() {
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
if err := acc.AddMapping(src, jsAllAPI); err != nil {
c.Debugf("Error adding JetStream domain mapping: %v", err)
}
}
} else if opts.JetStreamDomain != _EMPTY_ {
} else if hasJSDomain {
s.addInJSDenyAll(remote)
}
c.setPermissions(remote.perms)
remote.Unlock()
}
@@ -1039,9 +1057,15 @@ func (c *client) processLeafnodeInfo(info *Info) {
// Check if we have local deny clauses that we need to merge.
if remote := c.leaf.remote; remote != nil {
if len(remote.DenyExports) > 0 {
if perms.Publish == nil {
perms.Publish = &SubjectPermission{}
}
perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
}
if len(remote.DenyImports) > 0 {
if perms.Subscribe == nil {
perms.Subscribe = &SubjectPermission{}
}
perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
}
}
@@ -1228,21 +1252,24 @@ func (s *Server) removeLeafNodeConnection(c *client) {
s.removeFromTempClients(cid)
}
// Connect information for solicited leafnodes.
type leafConnectInfo struct {
JWT string `json:"jwt,omitempty"`
Sig string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
TLS bool `json:"tls_required"`
Comp bool `json:"compression,omitempty"`
ID string `json:"server_id,omitempty"`
Name string `json:"name,omitempty"`
Hub bool `json:"is_hub,omitempty"`
Cluster string `json:"cluster,omitempty"`
Headers bool `json:"headers,omitempty"`
JWT string `json:"jwt,omitempty"`
Sig string `json:"sig,omitempty"`
User string `json:"user,omitempty"`
Pass string `json:"pass,omitempty"`
TLS bool `json:"tls_required"`
Comp bool `json:"compression,omitempty"`
ID string `json:"server_id,omitempty"`
Name string `json:"name,omitempty"`
Hub bool `json:"is_hub,omitempty"`
Cluster string `json:"cluster,omitempty"`
Headers bool `json:"headers,omitempty"`
JetStream bool `json:"jetstream,omitempty"`
DenyPub []string `json:"deny_pub,omitempty"`
// Just used to detect wrong connection attempts.
Gateway string `json:"gateway,omitempty"`
DenyPub []string `json:"deny_pub,omitempty"`
Gateway string `json:"gateway,omitempty"`
}
// processLeafNodeConnect will process the inbound connect args.
@@ -1265,7 +1292,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
// Reject if this has Gateway which means that it would be from a gateway
// connection that incorrectly connects to the leafnode port.
if proto.Gateway != "" {
if proto.Gateway != _EMPTY_ {
errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
c.Errorf(errTxt)
c.sendErr(errTxt)
@@ -1303,7 +1330,14 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
}
// Check for JetStream domain
doDomainMappings := opts.JetStreamDomain != _EMPTY_ && c.acc != sysAcc && c.acc.jetStreamConfigured()
jsConfigured := c.acc.jetStreamConfigured()
doDomainMappings := opts.JetStreamDomain != _EMPTY_ && c.acc != sysAcc && jsConfigured
// If we have JS enabled and the other side does as well we need to add in an import deny clause.
if jsConfigured && proto.JetStream {
// We should never have existing perms here, if that changes this needs to be reworked.
c.setPermissions(&Permissions{Publish: &SubjectPermission{Deny: []string{jsAllAPI}}})
}
// Set the Ping timer
s.setFirstPingTimer(c)
@@ -2022,25 +2056,7 @@ func (c *client) processInboundLeafMsg(msg []byte) {
c.in.msgs++
c.in.bytes += int32(len(msg) - LEN_CR_LF)
// Check pub permissions
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) {
subject := c.pa.subject
// If this subject was mapped we need to check the original subject, not the new one.
if len(c.pa.mapped) > 0 {
subject = c.pa.mapped
}
if !c.pubAllowed(string(subject)) {
if c.isHubLeafNode() {
c.leafPubPermViolation(subject)
} else {
c.Debugf("Not permitted to receive from %q", subject)
}
return
}
}
srv := c.srv
acc := c.acc
srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
// Mostly under testing scenarios.
if srv == nil || acc == nil {
@@ -2054,7 +2070,7 @@ func (c *client) processInboundLeafMsg(msg []byte) {
genid := atomic.LoadUint64(&c.acc.sl.genid)
if genid == c.in.genid && c.in.results != nil {
r, ok = c.in.results[string(c.pa.subject)]
r, ok = c.in.results[subject]
} else {
// Reset our L1 completely.
c.in.results = make(map[string]*SublistResult)
@@ -2063,13 +2079,13 @@ func (c *client) processInboundLeafMsg(msg []byte) {
// Go back to the sublist data structure.
if !ok {
r = c.acc.sl.Match(string(c.pa.subject))
c.in.results[string(c.pa.subject)] = r
r = c.acc.sl.Match(subject)
c.in.results[subject] = r
// Prune the results cache. Keeps us from unbounded growth. Random delete.
if len(c.in.results) > maxResultCacheSize {
n := 0
for subject := range c.in.results {
delete(c.in.results, subject)
for subj := range c.in.results {
delete(c.in.results, subj)
if n++; n > pruneSize {
break
}
@@ -2101,12 +2117,6 @@ func (c *client) processInboundLeafMsg(msg []byte) {
}
}
// Handles a publish permission violation.
// See leafPermViolation() for details.
func (c *client) leafPubPermViolation(subj []byte) {
c.leafPermViolation(true, subj)
}
// Handles a subscription permission violation.
// See leafPermViolation() for details.
func (c *client) leafSubPermViolation(subj []byte) {