mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 18:50:41 -07:00
Merge pull request #1327 from nats-io/leaf_staggered
This commit allows new servers in a supercluster to be informed of accounts with leafnodes.
This commit is contained in:
@@ -398,9 +398,11 @@ func (a *Account) addClient(c *client) int {
|
||||
}
|
||||
}
|
||||
a.mu.Unlock()
|
||||
if c != nil && c.srv != nil && a != c.srv.globalAccount() && added {
|
||||
|
||||
if c != nil && c.srv != nil && added {
|
||||
c.srv.accConnsUpdate(a)
|
||||
}
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
|
||||
@@ -642,6 +642,7 @@ func (s *Server) updateRemoteServer(ms *ServerInfo) {
|
||||
su := s.sys.servers[ms.ID]
|
||||
if su == nil {
|
||||
s.sys.servers[ms.ID] = &serverUpdate{ms.Seq, time.Now()}
|
||||
s.processNewServer(ms)
|
||||
} else {
|
||||
// Should always be going up.
|
||||
if ms.Seq <= su.seq {
|
||||
@@ -653,6 +654,33 @@ func (s *Server) updateRemoteServer(ms *ServerInfo) {
|
||||
}
|
||||
}
|
||||
|
||||
// processNewServer will hold any logic we want to use when we discover a new server.
|
||||
// Lock should be held upon entry.
|
||||
func (s *Server) processNewServer(ms *ServerInfo) {
|
||||
// Right now we only check if we have leafnode servers and if so send another
|
||||
// connect update to make sure they switch this account to interest only mode.
|
||||
s.ensureGWsInterestOnlyForLeafNodes()
|
||||
}
|
||||
|
||||
// If GW is enabled on this server and there are any leaf node connections,
|
||||
// this function will send a LeafNode connect system event to the super cluster
|
||||
// to ensure that the GWs are in interest-only mode for this account.
|
||||
// Lock should be held upon entry.
|
||||
// TODO(dlc) - this will cause this account to be loaded on all servers. Need a better
|
||||
// way with GW2.
|
||||
func (s *Server) ensureGWsInterestOnlyForLeafNodes() {
|
||||
if !s.gateway.enabled || len(s.leafs) == 0 {
|
||||
return
|
||||
}
|
||||
sent := make(map[*Account]bool, len(s.leafs))
|
||||
for _, c := range s.leafs {
|
||||
if !sent[c.acc] {
|
||||
s.sendLeafNodeConnectMsg(c.acc.Name)
|
||||
sent[c.acc] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// shutdownEventing will clean up all eventing state.
|
||||
func (s *Server) shutdownEventing() {
|
||||
if !s.eventsRunning() {
|
||||
@@ -708,8 +736,7 @@ func (s *Server) connsRequest(sub *subscription, _ *client, subject, reply strin
|
||||
}
|
||||
}
|
||||
|
||||
// leafNodeConnected is an event we will receive when a leaf node for a given account
|
||||
// connects.
|
||||
// leafNodeConnected is an event we will receive when a leaf node for a given account connects.
|
||||
func (s *Server) leafNodeConnected(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
m := accNumConnsReq{}
|
||||
if err := json.Unmarshal(msg, &m); err != nil {
|
||||
@@ -805,19 +832,25 @@ func (s *Server) sendLeafNodeConnect(a *Account) {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
subj := fmt.Sprintf(leafNodeConnectEventSubj, a.Name)
|
||||
m := accNumConnsReq{Account: a.Name}
|
||||
s.sendInternalMsg(subj, "", &m.Server, &m)
|
||||
s.sendLeafNodeConnectMsg(a.Name)
|
||||
s.mu.Unlock()
|
||||
|
||||
s.switchAccountToInterestMode(a.Name)
|
||||
}
|
||||
|
||||
// Send the leafnode connect message.
|
||||
// Lock should be held.
|
||||
func (s *Server) sendLeafNodeConnectMsg(accName string) {
|
||||
subj := fmt.Sprintf(leafNodeConnectEventSubj, accName)
|
||||
m := accNumConnsReq{Account: accName}
|
||||
s.sendInternalMsg(subj, "", &m.Server, &m)
|
||||
}
|
||||
|
||||
// sendAccConnsUpdate is called to send out our information on the
|
||||
// account's local connections.
|
||||
// Lock should be held on entry.
|
||||
func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
|
||||
if !s.eventsEnabled() || a == nil || a == s.gacc {
|
||||
if !s.eventsEnabled() || a == nil {
|
||||
return
|
||||
}
|
||||
a.mu.RLock()
|
||||
|
||||
@@ -1077,6 +1077,12 @@ func (c *client) processGatewayInfo(info *Info) {
|
||||
// Send back to the server that initiated this gateway connection the
|
||||
// list of all remote gateways known on this server.
|
||||
s.gossipGatewaysToInboundGateway(info.Gateway, c)
|
||||
|
||||
// Now make sure if we have any knowledge of connected leafnodes that we resend the
|
||||
// connect events to switch those accounts into interest only mode.
|
||||
s.mu.Lock()
|
||||
s.ensureGWsInterestOnlyForLeafNodes()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -861,6 +861,15 @@ func (s *Server) setSystemAccount(acc *Account) error {
|
||||
// Send out statsz updates periodically.
|
||||
s.wrapChk(s.startStatszTimer)()
|
||||
|
||||
// If we have existing accounts make sure we enable account tracking.
|
||||
s.mu.Lock()
|
||||
s.accounts.Range(func(k, v interface{}) bool {
|
||||
acc := v.(*Account)
|
||||
s.enableAccountTracking(acc)
|
||||
return true
|
||||
})
|
||||
s.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1153,7 +1162,7 @@ func (s *Server) Start() {
|
||||
return
|
||||
}
|
||||
|
||||
// Setup system account which will start eventing stack.
|
||||
// Setup system account which will start the eventing stack.
|
||||
if sa := opts.SystemAccount; sa != _EMPTY_ {
|
||||
if err := s.SetSystemAccount(sa); err != nil {
|
||||
s.Fatalf("Can't set system account: %v", err)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2019 The NATS Authors
|
||||
// Copyright 2019-2020 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -659,12 +659,12 @@ func waitForOutboundGateways(t *testing.T, s *server.Server, expected int, timeo
|
||||
// Will have Gateways and Leaf Node connections active.
|
||||
func createClusterWithName(t *testing.T, clusterName string, numServers int, connectTo ...*cluster) *cluster {
|
||||
t.Helper()
|
||||
return createClusterEx(t, false, clusterName, numServers, connectTo...)
|
||||
return createClusterEx(t, false, 5*time.Millisecond, true, clusterName, numServers, connectTo...)
|
||||
}
|
||||
|
||||
// Creates a cluster and optionally additional accounts and users.
|
||||
// Will have Gateways and Leaf Node connections active.
|
||||
func createClusterEx(t *testing.T, doAccounts bool, clusterName string, numServers int, connectTo ...*cluster) *cluster {
|
||||
func createClusterEx(t *testing.T, doAccounts bool, gwSolicit time.Duration, waitOnGWs bool, clusterName string, numServers int, connectTo ...*cluster) *cluster {
|
||||
t.Helper()
|
||||
|
||||
if clusterName == "" || numServers < 1 {
|
||||
@@ -702,7 +702,10 @@ func createClusterEx(t *testing.T, doAccounts bool, clusterName string, numServe
|
||||
}
|
||||
|
||||
bindGlobal := func(s *server.Server) {
|
||||
ngs, _ := s.LookupAccount("NGS")
|
||||
ngs, err := s.LookupAccount("NGS")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Bind global to service import
|
||||
gacc, _ := s.LookupAccount("$G")
|
||||
gacc.AddServiceImport(ngs, "ngs.usage", "ngs.usage.$G")
|
||||
@@ -718,7 +721,7 @@ func createClusterEx(t *testing.T, doAccounts bool, clusterName string, numServe
|
||||
}
|
||||
|
||||
// Make the GWs form faster for the tests.
|
||||
server.SetGatewaysSolicitDelay(5 * time.Millisecond)
|
||||
server.SetGatewaysSolicitDelay(gwSolicit)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
// Create seed first.
|
||||
@@ -757,10 +760,12 @@ func createClusterEx(t *testing.T, doAccounts bool, clusterName string, numServe
|
||||
}
|
||||
checkClusterFormed(t, c.servers...)
|
||||
|
||||
// Wait on gateway connections if we were asked to connect to other gateways.
|
||||
if numGWs := len(connectTo); numGWs > 0 {
|
||||
for _, s := range c.servers {
|
||||
waitForOutboundGateways(t, s, numGWs, 2*time.Second)
|
||||
if waitOnGWs {
|
||||
// Wait on gateway connections if we were asked to connect to other gateways.
|
||||
if numGWs := len(connectTo); numGWs > 0 {
|
||||
for _, s := range c.servers {
|
||||
waitForOutboundGateways(t, s, numGWs, 2*time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -940,6 +945,100 @@ func TestLeafNodeWithRouteAndGateway(t *testing.T) {
|
||||
checkLmsg(t, matches[0], "bar", "", "2", "OK")
|
||||
}
|
||||
|
||||
// This will test that we propagate interest only mode after a leafnode
|
||||
// has been established and a new server joins a remote cluster.
|
||||
func TestLeafNodeWithGatewaysAndStaggeredStart(t *testing.T) {
|
||||
ca := createClusterWithName(t, "A", 3)
|
||||
defer shutdownCluster(ca)
|
||||
|
||||
// Create the leafnode on a server in cluster A.
|
||||
opts := ca.opts[0]
|
||||
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc.Close()
|
||||
|
||||
leafSend, leafExpect := setupLeaf(t, lc, 3)
|
||||
leafSend("PING\r\n")
|
||||
leafExpect(pongRe)
|
||||
|
||||
// Now setup the cluster B.
|
||||
cb := createClusterWithName(t, "B", 3, ca)
|
||||
defer shutdownCluster(cb)
|
||||
|
||||
// Create client on a server in cluster B
|
||||
opts = cb.opts[0]
|
||||
c := createClientConn(t, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
|
||||
send, expect := setupConn(t, c)
|
||||
send("PING\r\n")
|
||||
expect(pongRe)
|
||||
|
||||
// Make sure we see interest graph propagation on the leaf node
|
||||
// connection. This is required since leaf nodes only send data
|
||||
// in the presence of interest.
|
||||
send("SUB foo 1\r\nPING\r\n")
|
||||
expect(pongRe)
|
||||
leafExpect(lsubRe)
|
||||
}
|
||||
|
||||
// This will test that we propagate interest only mode after a leafnode
|
||||
// has been established and a server is restarted..
|
||||
func TestLeafNodeWithGatewaysServerRestart(t *testing.T) {
|
||||
ca := createClusterWithName(t, "A", 3)
|
||||
defer shutdownCluster(ca)
|
||||
|
||||
// Now setup the cluster B.
|
||||
cb := createClusterWithName(t, "B", 3, ca)
|
||||
defer shutdownCluster(cb)
|
||||
|
||||
// Create the leafnode on a server in cluster B.
|
||||
opts := cb.opts[1]
|
||||
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc.Close()
|
||||
|
||||
leafSend, leafExpect := setupLeaf(t, lc, 3)
|
||||
leafSend("PING\r\n")
|
||||
leafExpect(pongRe)
|
||||
|
||||
// Create client on a server in cluster A
|
||||
opts = ca.opts[1]
|
||||
c := createClientConn(t, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
|
||||
send, expect := setupConn(t, c)
|
||||
send("PING\r\n")
|
||||
expect(pongRe)
|
||||
|
||||
// Make sure we see interest graph propagation on the leaf node
|
||||
// connection. This is required since leaf nodes only send data
|
||||
// in the presence of interest.
|
||||
send("SUB foo 1\r\nPING\r\n")
|
||||
expect(pongRe)
|
||||
leafExpect(lsubRe)
|
||||
|
||||
// Close old leaf connection and simulate a reconnect.
|
||||
lc.Close()
|
||||
|
||||
// Shutdown and recreate B and the leafnode connection to it.
|
||||
shutdownCluster(cb)
|
||||
|
||||
// Create new cluster with longer solicit and don't wait for GW connect.
|
||||
cb = createClusterEx(t, false, 500*time.Millisecond, false, "B", 1, ca)
|
||||
defer shutdownCluster(cb)
|
||||
|
||||
opts = cb.opts[0]
|
||||
lc = createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc.Close()
|
||||
|
||||
_, leafExpect = setupLeaf(t, lc, 3)
|
||||
|
||||
// Now wait on GW solicit to fire
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// We should see the interest for 'foo' here.
|
||||
leafExpect(lsubRe)
|
||||
}
|
||||
|
||||
func TestLeafNodeLocalizedDQ(t *testing.T) {
|
||||
s, opts := runLeafServer()
|
||||
defer s.Shutdown()
|
||||
@@ -2345,12 +2444,10 @@ func TestLeafNodeSendsRemoteSubsOnConnect(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLeafNodeServiceImportLikeNGS(t *testing.T) {
|
||||
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
ca := createClusterEx(t, true, "A", 3)
|
||||
gwSolicit := 10 * time.Millisecond
|
||||
ca := createClusterEx(t, true, gwSolicit, true, "A", 3)
|
||||
defer shutdownCluster(ca)
|
||||
cb := createClusterEx(t, true, "B", 3, ca)
|
||||
cb := createClusterEx(t, true, gwSolicit, true, "B", 3, ca)
|
||||
defer shutdownCluster(cb)
|
||||
|
||||
// Hang a responder off of cluster A.
|
||||
@@ -2465,14 +2562,12 @@ func TestLeafNodeSendsAccountingEvents(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLeafNodeDistributedQueueAcrossGWs(t *testing.T) {
|
||||
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
ca := createClusterEx(t, true, "A", 3)
|
||||
gwSolicit := 10 * time.Millisecond
|
||||
ca := createClusterEx(t, true, gwSolicit, true, "A", 3)
|
||||
defer shutdownCluster(ca)
|
||||
cb := createClusterEx(t, true, "B", 3, ca)
|
||||
cb := createClusterEx(t, true, gwSolicit, true, "B", 3, ca)
|
||||
defer shutdownCluster(cb)
|
||||
cc := createClusterEx(t, true, "C", 3, ca, cb)
|
||||
cc := createClusterEx(t, true, gwSolicit, true, "C", 3, ca, cb)
|
||||
defer shutdownCluster(cc)
|
||||
|
||||
// Create queue subscribers
|
||||
@@ -2554,12 +2649,10 @@ func TestLeafNodeDistributedQueueAcrossGWs(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLeafNodeDistributedQueueEvenly(t *testing.T) {
|
||||
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
ca := createClusterEx(t, true, "A", 3)
|
||||
gwSolicit := 10 * time.Millisecond
|
||||
ca := createClusterEx(t, true, gwSolicit, true, "A", 3)
|
||||
defer shutdownCluster(ca)
|
||||
cb := createClusterEx(t, true, "B", 3, ca)
|
||||
cb := createClusterEx(t, true, gwSolicit, true, "B", 3, ca)
|
||||
defer shutdownCluster(cb)
|
||||
|
||||
// Create queue subscribers
|
||||
|
||||
@@ -60,7 +60,7 @@ func createSuperCluster(t *testing.T, numServersPer, numClusters int) *superclus
|
||||
|
||||
for i := 0; i < numClusters; i++ {
|
||||
// Pick cluster name and setup default accounts.
|
||||
c := createClusterEx(t, true, randClusterName(), numServersPer, clusters...)
|
||||
c := createClusterEx(t, true, 5*time.Millisecond, true, randClusterName(), numServersPer, clusters...)
|
||||
clusters = append(clusters, c)
|
||||
}
|
||||
return &supercluster{clusters}
|
||||
|
||||
Reference in New Issue
Block a user