From ea5cddd5901c4fd12a8cad2c6443c8f1e361e008 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 6 May 2021 13:10:28 -0700 Subject: [PATCH] Moved the JetStream logic for solicited leafnodes to after we receive first info. We needed access to the other side's JetStream status. Signed-off-by: Derek Collison --- server/jetstream.go | 2 +- server/jetstream_cluster_test.go | 2 +- server/leafnode.go | 67 +++++++++++++++++++------------- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index 00c60954..595092b4 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -780,7 +780,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error { // If so add in a subject mapping that will allow local connected clients to reach us here as well. opts := s.getOpts() if opts.JetStreamDomain != _EMPTY_ { - s.Debugf(" Enable Domain: %s", opts.JetStreamDomain) + s.Noticef(" Enable Domain: %s", opts.JetStreamDomain) src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain) if err := a.AddMapping(src, jsAllAPI); err != nil { s.Debugf("Error adding JetStream domain mapping: %v", err) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index c4a1b6c3..eae53d1f 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -5789,7 +5789,7 @@ func TestJetStreamClusterSingleLeafNodeWithoutSharedSystemAccount(t *testing.T) // Wait for a bit and make sure we only get one of these. // The HUB domain should be cut off by default. - time.Sleep(200 * time.Millisecond) + time.Sleep(250 * time.Millisecond) checkSubsPending(t, sub, 1) // Drain. for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) { diff --git a/server/leafnode.go b/server/leafnode.go index 6e4ba4cd..6404381f 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -157,7 +157,7 @@ func (s *Server) addInJSDenyAll(r *leafNodeCfg) { denyAll := []string{jscAllSubj, raftAllSubj, jsAllAPI} s.Noticef("Sharing system account but utilizing separate JetStream Domains") - s.Noticef("Adding deny of %+v for leafnode configuration that bridges system account", denyAll, r.LocalAccount) + s.Noticef("Adding deny of %+v for leafnode configuration that bridges system account", denyAll) r.DenyExports = append(r.DenyExports, denyAll...) r.DenyImports = append(r.DenyImports, denyAll...) @@ -580,6 +580,7 @@ func (s *Server) startLeafNodeAcceptLoop() { TLSVerify: tlsVerify, MaxPayload: s.info.MaxPayload, // TODO(dlc) - Allow override? Headers: s.supportsHeaders(), + JetStream: opts.JetStream, Proto: 1, // Fixed for now. InfoOnConnect: true, } @@ -782,11 +783,9 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf // Determines if we are soliciting the connection or not. var solicited bool - var acc, sysAcc *Account - var hasSysShared bool + var acc *Account if remote != nil { - hasSysShared, sysAcc = s.hasSystemRemoteLeaf(), s.SystemAccount() // TODO: Decide what should be the optimal behavior here. // For now, if lookup fails, we will constantly try // to recreate this LN connection. @@ -815,29 +814,6 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf if remote != nil { solicited = true remote.Lock() - // Check for JetStream semantics to deny the JetStream API as needed. - // This is so that if JetStream is enabled on both sides we can separately address both. - if acc != sysAcc { - if hasSysShared { - s.addInJSDeny(remote) - } else { - // Here we want to suppress if this local account has JS enabled. - // This is regardless of whether or not this server is actually running JS. - if acc != nil && acc.jetStreamConfigured() { - s.addInJSDeny(remote) - } - } - // If we have a specified JetStream domain we will want to add a mapping to - // allow access cross domain for each non-system account. - if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && acc.jetStreamConfigured() { - src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain) - if err := acc.AddMapping(src, jsAllAPI); err != nil { - c.Debugf("Error adding JetStream domain mapping: %v", err) - } - } - } else if opts.JetStreamDomain != _EMPTY_ { - s.addInJSDenyAll(remote) - } c.leaf.remote = remote c.setPermissions(remote.perms) if !c.leaf.remote.Hub { @@ -856,6 +832,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf var nonce [nonceLen]byte var info *Info + // Grab this before the client lock below. if !solicited { // Grab server variables s.mu.Lock() @@ -900,6 +877,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf info.Nonce = string(c.nonce) info.CID = c.cid b, _ := json.Marshal(info) + pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} // We have to send from this go routine because we may // have to block for TLS handshake before we start our @@ -951,6 +929,10 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf } func (c *client) processLeafnodeInfo(info *Info) { + s := c.srv + opts := s.getOpts() + hasSysShared, sysAcc := s.hasSystemRemoteLeaf(), s.SystemAccount() + c.mu.Lock() if c.leaf == nil || c.isClosed() { c.mu.Unlock() @@ -1006,6 +988,36 @@ func (c *client) processLeafnodeInfo(info *Info) { } else { c.leaf.remoteServer = info.Name } + + // Check for JetStream semantics to deny the JetStream API as needed. + // This is so that if JetStream is enabled on both sides we can separately address both. + if remote, acc := c.leaf.remote, c.acc; remote != nil { + remote.Lock() + if acc != sysAcc { + if hasSysShared { + s.addInJSDeny(remote) + } else { + // Here we want to suppress if this local account has JS enabled. + // This is regardless of whether or not this server is actually running JS. + // We do consider this if the other side is not running JetStream. + if acc != nil && acc.jetStreamConfigured() && info.JetStream { + s.addInJSDeny(remote) + } + } + // If we have a specified JetStream domain we will want to add a mapping to + // allow access cross domain for each non-system account. + if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && acc.jetStreamConfigured() { + src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain) + if err := acc.AddMapping(src, jsAllAPI); err != nil { + c.Debugf("Error adding JetStream domain mapping: %v", err) + } + } + } else if opts.JetStreamDomain != _EMPTY_ { + s.addInJSDenyAll(remote) + } + c.setPermissions(remote.perms) + remote.Unlock() + } } // For both initial INFO and async INFO protocols, Possibly // update our list of remote leafnode URLs we can connect to. @@ -1043,7 +1055,6 @@ func (c *client) processLeafnodeInfo(info *Info) { resumeConnect = true } - s := c.srv // Check if we have the remote account information and if so make sure it's stored. if info.RemoteAccount != _EMPTY_ { s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)