mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Have hub role sent to accepting side and adapt to be a spoke
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -58,6 +58,9 @@ type leaf struct {
|
||||
smap map[string]int32
|
||||
// We have any auth stuff here for solicited connections.
|
||||
remote *leafNodeCfg
|
||||
// isSpoke tells us what role we are playing.
|
||||
// Used when we receive a connection but otherside tells us they are a hub.
|
||||
isSpoke bool
|
||||
}
|
||||
|
||||
// Used for remote (solicited) leafnodes.
|
||||
@@ -78,9 +81,10 @@ func (c *client) isSolicitedLeafNode() bool {
|
||||
return c.kind == LEAF && c.leaf.remote != nil
|
||||
}
|
||||
|
||||
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub.
|
||||
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
|
||||
// connection leafnode where the otherside has declared itself to be the hub.
|
||||
func (c *client) isSpokeLeafNode() bool {
|
||||
return c.kind == LEAF && c.leaf.remote != nil && !c.leaf.remote.Hub
|
||||
return c.kind == LEAF && c.leaf.isSpoke
|
||||
}
|
||||
|
||||
func (c *client) isUnsolicitedLeafNode() bool {
|
||||
@@ -443,6 +447,7 @@ func (c *client) sendLeafConnect(tlsRequired bool) {
|
||||
cinfo := leafConnectInfo{
|
||||
TLS: tlsRequired,
|
||||
Name: c.srv.info.ID,
|
||||
Hub: c.leaf.remote.Hub,
|
||||
}
|
||||
|
||||
// Check for credentials first, that will take precedence..
|
||||
@@ -600,7 +605,11 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
}
|
||||
c.leaf.remote = remote
|
||||
c.setPermissions(remote.perms)
|
||||
sendSysConnectEvent = c.leaf.remote.Hub
|
||||
if c.leaf.remote.Hub {
|
||||
sendSysConnectEvent = true
|
||||
} else {
|
||||
c.leaf.isSpoke = true
|
||||
}
|
||||
c.mu.Unlock()
|
||||
// TODO: Decide what should be the optimal behavior here.
|
||||
// For now, if lookup fails, we will constantly try
|
||||
@@ -962,7 +971,7 @@ type leafConnectInfo struct {
|
||||
TLS bool `json:"tls_required"`
|
||||
Comp bool `json:"compression,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
|
||||
Hub bool `json:"is_hub,omitempty"`
|
||||
// Just used to detect wrong connection attempts.
|
||||
Gateway string `json:"gateway,omitempty"`
|
||||
}
|
||||
@@ -1000,6 +1009,11 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
c.opts.Echo = false
|
||||
c.opts.Pedantic = false
|
||||
|
||||
// If the other side has declared itself a hub, so we will take on the spoke role.
|
||||
if proto.Hub {
|
||||
c.leaf.isSpoke = true
|
||||
}
|
||||
|
||||
// Create and initialize the smap since we know our bound account now.
|
||||
lm := s.initLeafNodeSmap(c)
|
||||
// We are good to go, send over all the bound account subscriptions.
|
||||
|
||||
@@ -1360,6 +1360,7 @@ func TestLeafNodeLoopDetectedOnAcceptSide(t *testing.T) {
|
||||
|
||||
func TestLeafNodeHubWithGateways(t *testing.T) {
|
||||
ao := DefaultOptions()
|
||||
ao.ServerName = "A"
|
||||
ao.LeafNode.Host = "127.0.0.1"
|
||||
ao.LeafNode.Port = -1
|
||||
a := RunServer(ao)
|
||||
@@ -1368,6 +1369,7 @@ func TestLeafNodeHubWithGateways(t *testing.T) {
|
||||
ua, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ao.LeafNode.Port))
|
||||
|
||||
bo := testDefaultOptionsForGateway("B")
|
||||
bo.ServerName = "B"
|
||||
bo.Accounts = []*Account{NewAccount("SYS")}
|
||||
bo.SystemAccount = "SYS"
|
||||
bo.LeafNode.ReconnectInterval = 5 * time.Millisecond
|
||||
@@ -1381,6 +1383,7 @@ func TestLeafNodeHubWithGateways(t *testing.T) {
|
||||
defer b.Shutdown()
|
||||
|
||||
do := DefaultOptions()
|
||||
do.ServerName = "D"
|
||||
do.LeafNode.Host = "127.0.0.1"
|
||||
do.LeafNode.Port = -1
|
||||
d := RunServer(do)
|
||||
@@ -1389,6 +1392,7 @@ func TestLeafNodeHubWithGateways(t *testing.T) {
|
||||
ud, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", do.LeafNode.Port))
|
||||
|
||||
co := testGatewayOptionsFromToWithServers(t, "C", "B", b)
|
||||
co.ServerName = "C"
|
||||
co.Accounts = []*Account{NewAccount("SYS")}
|
||||
co.SystemAccount = "SYS"
|
||||
co.LeafNode.ReconnectInterval = 5 * time.Millisecond
|
||||
@@ -1420,7 +1424,7 @@ func TestLeafNodeHubWithGateways(t *testing.T) {
|
||||
// Wait for interest to be registered on A.
|
||||
checkExpectedSubs(t, subsOnABefore+1, a)
|
||||
|
||||
// Create requestor on A and send the request, expect reply.
|
||||
// Create requestor on A and send the request, expect a reply.
|
||||
ncA := natsConnect(t, a.ClientURL())
|
||||
defer ncA.Close()
|
||||
if msg, err := ncA.Request("service", []byte("request"), time.Second); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user