mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Moved the JetStream logic for solicited leafnodes to after we receive first info.
We needed access to the other side's JetStream status. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
committed by
Ivan Kozlovic
parent
f5eb8bef89
commit
ea5cddd590
@@ -780,7 +780,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
|
||||
// If so add in a subject mapping that will allow local connected clients to reach us here as well.
|
||||
opts := s.getOpts()
|
||||
if opts.JetStreamDomain != _EMPTY_ {
|
||||
s.Debugf(" Enable Domain: %s", opts.JetStreamDomain)
|
||||
s.Noticef(" Enable Domain: %s", opts.JetStreamDomain)
|
||||
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
|
||||
if err := a.AddMapping(src, jsAllAPI); err != nil {
|
||||
s.Debugf("Error adding JetStream domain mapping: %v", err)
|
||||
|
||||
@@ -5789,7 +5789,7 @@ func TestJetStreamClusterSingleLeafNodeWithoutSharedSystemAccount(t *testing.T)
|
||||
|
||||
// Wait for a bit and make sure we only get one of these.
|
||||
// The HUB domain should be cut off by default.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
checkSubsPending(t, sub, 1)
|
||||
// Drain.
|
||||
for _, err := sub.NextMsg(0); err == nil; _, err = sub.NextMsg(0) {
|
||||
|
||||
@@ -157,7 +157,7 @@ func (s *Server) addInJSDenyAll(r *leafNodeCfg) {
|
||||
denyAll := []string{jscAllSubj, raftAllSubj, jsAllAPI}
|
||||
|
||||
s.Noticef("Sharing system account but utilizing separate JetStream Domains")
|
||||
s.Noticef("Adding deny of %+v for leafnode configuration that bridges system account", denyAll, r.LocalAccount)
|
||||
s.Noticef("Adding deny of %+v for leafnode configuration that bridges system account", denyAll)
|
||||
|
||||
r.DenyExports = append(r.DenyExports, denyAll...)
|
||||
r.DenyImports = append(r.DenyImports, denyAll...)
|
||||
@@ -580,6 +580,7 @@ func (s *Server) startLeafNodeAcceptLoop() {
|
||||
TLSVerify: tlsVerify,
|
||||
MaxPayload: s.info.MaxPayload, // TODO(dlc) - Allow override?
|
||||
Headers: s.supportsHeaders(),
|
||||
JetStream: opts.JetStream,
|
||||
Proto: 1, // Fixed for now.
|
||||
InfoOnConnect: true,
|
||||
}
|
||||
@@ -782,11 +783,9 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
|
||||
|
||||
// Determines if we are soliciting the connection or not.
|
||||
var solicited bool
|
||||
var acc, sysAcc *Account
|
||||
var hasSysShared bool
|
||||
var acc *Account
|
||||
|
||||
if remote != nil {
|
||||
hasSysShared, sysAcc = s.hasSystemRemoteLeaf(), s.SystemAccount()
|
||||
// TODO: Decide what should be the optimal behavior here.
|
||||
// For now, if lookup fails, we will constantly try
|
||||
// to recreate this LN connection.
|
||||
@@ -815,29 +814,6 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
|
||||
if remote != nil {
|
||||
solicited = true
|
||||
remote.Lock()
|
||||
// Check for JetStream semantics to deny the JetStream API as needed.
|
||||
// This is so that if JetStream is enabled on both sides we can separately address both.
|
||||
if acc != sysAcc {
|
||||
if hasSysShared {
|
||||
s.addInJSDeny(remote)
|
||||
} else {
|
||||
// Here we want to suppress if this local account has JS enabled.
|
||||
// This is regardless of whether or not this server is actually running JS.
|
||||
if acc != nil && acc.jetStreamConfigured() {
|
||||
s.addInJSDeny(remote)
|
||||
}
|
||||
}
|
||||
// If we have a specified JetStream domain we will want to add a mapping to
|
||||
// allow access cross domain for each non-system account.
|
||||
if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && acc.jetStreamConfigured() {
|
||||
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
|
||||
if err := acc.AddMapping(src, jsAllAPI); err != nil {
|
||||
c.Debugf("Error adding JetStream domain mapping: %v", err)
|
||||
}
|
||||
}
|
||||
} else if opts.JetStreamDomain != _EMPTY_ {
|
||||
s.addInJSDenyAll(remote)
|
||||
}
|
||||
c.leaf.remote = remote
|
||||
c.setPermissions(remote.perms)
|
||||
if !c.leaf.remote.Hub {
|
||||
@@ -856,6 +832,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
|
||||
var nonce [nonceLen]byte
|
||||
var info *Info
|
||||
|
||||
// Grab this before the client lock below.
|
||||
if !solicited {
|
||||
// Grab server variables
|
||||
s.mu.Lock()
|
||||
@@ -900,6 +877,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
|
||||
info.Nonce = string(c.nonce)
|
||||
info.CID = c.cid
|
||||
b, _ := json.Marshal(info)
|
||||
|
||||
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
|
||||
// We have to send from this go routine because we may
|
||||
// have to block for TLS handshake before we start our
|
||||
@@ -951,6 +929,10 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
|
||||
}
|
||||
|
||||
func (c *client) processLeafnodeInfo(info *Info) {
|
||||
s := c.srv
|
||||
opts := s.getOpts()
|
||||
hasSysShared, sysAcc := s.hasSystemRemoteLeaf(), s.SystemAccount()
|
||||
|
||||
c.mu.Lock()
|
||||
if c.leaf == nil || c.isClosed() {
|
||||
c.mu.Unlock()
|
||||
@@ -1006,6 +988,36 @@ func (c *client) processLeafnodeInfo(info *Info) {
|
||||
} else {
|
||||
c.leaf.remoteServer = info.Name
|
||||
}
|
||||
|
||||
// Check for JetStream semantics to deny the JetStream API as needed.
|
||||
// This is so that if JetStream is enabled on both sides we can separately address both.
|
||||
if remote, acc := c.leaf.remote, c.acc; remote != nil {
|
||||
remote.Lock()
|
||||
if acc != sysAcc {
|
||||
if hasSysShared {
|
||||
s.addInJSDeny(remote)
|
||||
} else {
|
||||
// Here we want to suppress if this local account has JS enabled.
|
||||
// This is regardless of whether or not this server is actually running JS.
|
||||
// We do consider this if the other side is not running JetStream.
|
||||
if acc != nil && acc.jetStreamConfigured() && info.JetStream {
|
||||
s.addInJSDeny(remote)
|
||||
}
|
||||
}
|
||||
// If we have a specified JetStream domain we will want to add a mapping to
|
||||
// allow access cross domain for each non-system account.
|
||||
if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && acc.jetStreamConfigured() {
|
||||
src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
|
||||
if err := acc.AddMapping(src, jsAllAPI); err != nil {
|
||||
c.Debugf("Error adding JetStream domain mapping: %v", err)
|
||||
}
|
||||
}
|
||||
} else if opts.JetStreamDomain != _EMPTY_ {
|
||||
s.addInJSDenyAll(remote)
|
||||
}
|
||||
c.setPermissions(remote.perms)
|
||||
remote.Unlock()
|
||||
}
|
||||
}
|
||||
// For both initial INFO and async INFO protocols, Possibly
|
||||
// update our list of remote leafnode URLs we can connect to.
|
||||
@@ -1043,7 +1055,6 @@ func (c *client) processLeafnodeInfo(info *Info) {
|
||||
resumeConnect = true
|
||||
}
|
||||
|
||||
s := c.srv
|
||||
// Check if we have the remote account information and if so make sure it's stored.
|
||||
if info.RemoteAccount != _EMPTY_ {
|
||||
s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
|
||||
|
||||
Reference in New Issue
Block a user