mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
[ADDED] Leafnode remote's Hub option
This allows a node that creates a remote LeafNode connection to act as it was the hub (of the hub and spoke topology). This is related to subscription interest propagation. Normally, a spoke (the one creating the remote LN connection) will forward only its local subscriptions and when receiving subscription interest would not try to forward to local cluster and/or gateways. If a remote has the Hub boolean set to true, even though the node is the one creating the remote LN connection, it will behave as if it was accepting that connection. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -3052,7 +3052,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
// these after everything else.
|
||||
switch sub.client.kind {
|
||||
case ROUTER:
|
||||
if (c.kind != ROUTER && !c.isSolicitedLeafNode()) || (flags&pmrAllowSendFromRouteToRoute != 0) {
|
||||
if (c.kind != ROUTER && !c.isSpokeLeafNode()) || (flags&pmrAllowSendFromRouteToRoute != 0) {
|
||||
c.addSubToRouteTargets(sub)
|
||||
}
|
||||
continue
|
||||
@@ -3064,7 +3064,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
// Leaf node delivery audience is different however.
|
||||
// Also leaf nodes are always no echo, so we make sure we are not
|
||||
// going to send back to ourselves here.
|
||||
if c != sub.client && (c.kind != ROUTER || !c.isSolicitedLeafNode()) {
|
||||
if c != sub.client && (c.kind != ROUTER || !c.isSpokeLeafNode()) {
|
||||
c.addSubToRouteTargets(sub)
|
||||
}
|
||||
continue
|
||||
|
||||
@@ -74,6 +74,11 @@ func (c *client) isSolicitedLeafNode() bool {
|
||||
return c.kind == LEAF && c.leaf.remote != nil
|
||||
}
|
||||
|
||||
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub.
|
||||
func (c *client) isSpokeLeafNode() bool {
|
||||
return c.kind == LEAF && c.leaf.remote != nil && !c.leaf.remote.Hub
|
||||
}
|
||||
|
||||
func (c *client) isUnsolicitedLeafNode() bool {
|
||||
return c.kind == LEAF && c.leaf.remote == nil
|
||||
}
|
||||
@@ -570,6 +575,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
|
||||
// Determines if we are soliciting the connection or not.
|
||||
var solicited bool
|
||||
var sendSysConnectEvent bool
|
||||
|
||||
c.mu.Lock()
|
||||
c.initClient()
|
||||
@@ -581,6 +587,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
remote.LocalAccount = globalAccountName
|
||||
}
|
||||
c.leaf.remote = remote
|
||||
sendSysConnectEvent = c.leaf.remote.Hub
|
||||
c.mu.Unlock()
|
||||
// TODO: Decide what should be the optimal behavior here.
|
||||
// For now, if lookup fails, we will constantly try
|
||||
@@ -785,6 +792,9 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
s.addLeafNodeConnection(c)
|
||||
s.initLeafNodeSmap(c)
|
||||
c.sendAllLeafSubs()
|
||||
if sendSysConnectEvent {
|
||||
s.sendLeafNodeConnect(c.acc)
|
||||
}
|
||||
}
|
||||
|
||||
return c
|
||||
@@ -1014,7 +1024,7 @@ func (s *Server) initLeafNodeSmap(c *client) int {
|
||||
acc.mu.Lock()
|
||||
accName := acc.Name
|
||||
// If we are solicited we only send interest for local clients.
|
||||
if c.isSolicitedLeafNode() {
|
||||
if c.isSpokeLeafNode() {
|
||||
acc.sl.localSubs(&subs)
|
||||
} else {
|
||||
acc.sl.All(&subs)
|
||||
@@ -1051,7 +1061,7 @@ func (s *Server) initLeafNodeSmap(c *client) int {
|
||||
}
|
||||
|
||||
applyGlobalRouting := s.gateway.enabled
|
||||
if c.isSolicitedLeafNode() {
|
||||
if c.isSpokeLeafNode() {
|
||||
// Add a fake subscription for this solicited leafnode connection
|
||||
// so that we can send back directly for mapped GW replies.
|
||||
c.srv.gwLeafSubs.Insert(&subscription{client: c, subject: []byte(gwReplyPrefix + ">")})
|
||||
@@ -1129,7 +1139,7 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
|
||||
|
||||
// If we are solicited make sure this is a local client or a non-solicited leaf node
|
||||
skind := sub.client.kind
|
||||
if c.isSolicitedLeafNode() && !(skind == CLIENT || (skind == LEAF && !sub.client.isSolicitedLeafNode())) {
|
||||
if c.isSpokeLeafNode() && !(skind == CLIENT || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -1335,7 +1345,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
atomic.StoreInt32(&osub.qw, sub.qw)
|
||||
acc.sl.UpdateRemoteQSub(osub)
|
||||
}
|
||||
solicited := c.isSolicitedLeafNode()
|
||||
spoke := c.isSpokeLeafNode()
|
||||
c.mu.Unlock()
|
||||
|
||||
if err := c.addShadowSubscriptions(acc, sub); err != nil {
|
||||
@@ -1345,7 +1355,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
// If we are not solicited, treat leaf node subscriptions similar to a
|
||||
// client subscription, meaning we forward them to routes, gateways and
|
||||
// other leaf nodes as needed.
|
||||
if !solicited {
|
||||
if !spoke {
|
||||
// If we are routing add to the route map for the associated account.
|
||||
srv.updateRouteSubscriptionMap(acc, sub, 1)
|
||||
if updateGWs {
|
||||
|
||||
@@ -1015,3 +1015,83 @@ func TestLeafNodeRemoteWrongPort(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeRemoteIsHub(t *testing.T) {
|
||||
oa := testDefaultOptionsForGateway("A")
|
||||
oa.Accounts = []*Account{NewAccount("sys")}
|
||||
oa.SystemAccount = "sys"
|
||||
sa := RunServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
lno := DefaultOptions()
|
||||
lno.LeafNode.Host = "127.0.0.1"
|
||||
lno.LeafNode.Port = -1
|
||||
ln := RunServer(lno)
|
||||
defer ln.Shutdown()
|
||||
|
||||
ob1 := testGatewayOptionsFromToWithServers(t, "B", "A", sa)
|
||||
ob1.Accounts = []*Account{NewAccount("sys")}
|
||||
ob1.SystemAccount = "sys"
|
||||
ob1.Cluster.Host = "127.0.0.1"
|
||||
ob1.Cluster.Port = -1
|
||||
ob1.LeafNode.Host = "127.0.0.1"
|
||||
ob1.LeafNode.Port = -1
|
||||
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lno.LeafNode.Port))
|
||||
ob1.LeafNode.Remotes = []*RemoteLeafOpts{
|
||||
&RemoteLeafOpts{
|
||||
URLs: []*url.URL{u},
|
||||
Hub: true,
|
||||
},
|
||||
}
|
||||
sb1 := RunServer(ob1)
|
||||
defer sb1.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sb1, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sb1, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sa, 1, 2*time.Second)
|
||||
waitForInboundGateways(t, sa, 1, 2*time.Second)
|
||||
|
||||
checkLeafNodeConnected(t, sb1)
|
||||
|
||||
// For now, due to issue 977, let's restart the leafnode so that the
|
||||
// leafnode connect is propagated in the super-cluster.
|
||||
ln.Shutdown()
|
||||
ln = RunServer(lno)
|
||||
defer ln.Shutdown()
|
||||
checkLeafNodeConnected(t, sb1)
|
||||
|
||||
// Connect another server in cluster B
|
||||
ob2 := testGatewayOptionsFromToWithServers(t, "B", "A", sa)
|
||||
ob2.Accounts = []*Account{NewAccount("sys")}
|
||||
ob2.SystemAccount = "sys"
|
||||
ob2.Cluster.Host = "127.0.0.1"
|
||||
ob2.Cluster.Port = -1
|
||||
ob2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ob1.Cluster.Port))
|
||||
sb2 := RunServer(ob2)
|
||||
defer sb2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, sb1, sb2)
|
||||
waitForOutboundGateways(t, sb2, 1, 2*time.Second)
|
||||
|
||||
// Create sub on "foo" connected to sa
|
||||
ncA := natsConnect(t, sa.ClientURL())
|
||||
defer ncA.Close()
|
||||
subFoo := natsSubSync(t, ncA, "foo")
|
||||
|
||||
// Create sub on "bar" connected to sb2
|
||||
ncB2 := natsConnect(t, sb2.ClientURL())
|
||||
defer ncB2.Close()
|
||||
subBar := natsSubSync(t, ncB2, "bar")
|
||||
|
||||
// Create pub connection on leafnode
|
||||
ncLN := natsConnect(t, ln.ClientURL())
|
||||
defer ncLN.Close()
|
||||
|
||||
// Publish on foo and make sure it is received.
|
||||
natsPub(t, ncLN, "foo", []byte("msg"))
|
||||
natsNexMsg(t, subFoo, time.Second)
|
||||
|
||||
// Publish on foo and make sure it is received.
|
||||
natsPub(t, ncLN, "bar", []byte("msg"))
|
||||
natsNexMsg(t, subBar, time.Second)
|
||||
}
|
||||
|
||||
@@ -136,6 +136,7 @@ type RemoteLeafOpts struct {
|
||||
TLS bool `json:"-"`
|
||||
TLSConfig *tls.Config `json:"-"`
|
||||
TLSTimeout float64 `json:"tls_timeout,omitempty"`
|
||||
Hub bool `json:"hub,omitempty"`
|
||||
}
|
||||
|
||||
// Options block for nats-server.
|
||||
@@ -1376,6 +1377,8 @@ func parseRemoteLeafNodes(v interface{}, errors *[]error, warnings *[]error) ([]
|
||||
} else {
|
||||
remote.TLSTimeout = float64(DEFAULT_LEAF_TLS_TIMEOUT)
|
||||
}
|
||||
case "hub":
|
||||
remote.Hub = v.(bool)
|
||||
default:
|
||||
if !tk.IsUsedVariable() {
|
||||
err := &unknownConfigFieldErr{
|
||||
|
||||
Reference in New Issue
Block a user