mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Various fixes, init smap for leafnodes with gateways too
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -643,6 +643,9 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// For now this means we are binding the leafnode to the global account.
|
||||
c.registerWithAccount(s.globalAccount())
|
||||
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
|
||||
@@ -141,6 +141,7 @@ const (
|
||||
ServerShutdown
|
||||
AuthenticationExpired
|
||||
WrongGateway
|
||||
MissingAccount
|
||||
)
|
||||
|
||||
// Some flags passed to processMsgResultsEx
|
||||
@@ -549,9 +550,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) error {
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.user = user
|
||||
|
||||
// Assign permissions.
|
||||
if user.Permissions == nil {
|
||||
// Reset perms to nil in case client previously had them.
|
||||
@@ -560,6 +559,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) error {
|
||||
} else {
|
||||
c.setPermissions(user.Permissions)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -773,7 +773,7 @@ func (s *Server) accountConnectEvent(c *client) {
|
||||
Start: c.start,
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: c.acc.Name,
|
||||
Account: accForClient(c),
|
||||
User: nameForClient(c),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
@@ -812,7 +812,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
|
||||
Stop: &now,
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: c.acc.Name,
|
||||
Account: accForClient(c),
|
||||
User: nameForClient(c),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
@@ -853,7 +853,7 @@ func (s *Server) sendAuthErrorEvent(c *client) {
|
||||
Stop: &now,
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: c.acc.Name,
|
||||
Account: accForClient(c),
|
||||
User: nameForClient(c),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
@@ -941,6 +941,14 @@ func nameForClient(c *client) string {
|
||||
return "N/A"
|
||||
}
|
||||
|
||||
// Helper to grab account name for a client.
|
||||
func accForClient(c *client) string {
|
||||
if c.acc != nil {
|
||||
return c.acc.Name
|
||||
}
|
||||
return "N/A"
|
||||
}
|
||||
|
||||
// Helper to clear timers.
|
||||
func clearTimer(tp **time.Timer) {
|
||||
if t := *tp; t != nil {
|
||||
|
||||
@@ -73,7 +73,7 @@ func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Ensure that gateway is properly configured.
|
||||
// Ensure that leafnode is properly configured.
|
||||
func validateLeafNode(o *Options) error {
|
||||
if o.LeafNode.Port == 0 {
|
||||
return nil
|
||||
@@ -443,9 +443,13 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
remote.LocalAccount = globalAccountName
|
||||
}
|
||||
// FIXME(dlc) - Make this resolve at startup.
|
||||
c.acc, _ = s.LookupAccount(remote.LocalAccount)
|
||||
// Make sure we register with the account here.
|
||||
c.registerWithAccount(c.acc)
|
||||
acc, err := s.LookupAccount(remote.LocalAccount)
|
||||
if err != nil {
|
||||
c.Debugf("Can not locate local account %q for leafnode", remote.LocalAccount)
|
||||
c.closeConnection(MissingAccount)
|
||||
return nil
|
||||
}
|
||||
c.acc = acc
|
||||
c.leaf.remote = remote
|
||||
}
|
||||
|
||||
@@ -541,6 +545,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
|
||||
c.sendLeafConnect(tlsRequired)
|
||||
c.Debugf("Remote leaf node connect msg sent")
|
||||
|
||||
} else {
|
||||
// Send our info to the other side.
|
||||
// Remember the nonce we sent here for signatures, etc.
|
||||
@@ -600,8 +605,12 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
c.mu.Unlock()
|
||||
|
||||
// Update server's accounting here if we solicited.
|
||||
// Also send our local subs.
|
||||
if solicited {
|
||||
// Make sure we register with the account here.
|
||||
c.registerWithAccount(c.acc)
|
||||
s.addLeafNodeConnection(c)
|
||||
c.sendAllAccountSubs()
|
||||
}
|
||||
|
||||
return c
|
||||
@@ -762,7 +771,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
c.opts.Pedantic = false
|
||||
|
||||
// Create and initialize the smap since we know our bound account now.
|
||||
c.initLeafNodeSmap()
|
||||
s.initLeafNodeSmap(c)
|
||||
|
||||
// We are good to go, send over all the bound account subscriptions.
|
||||
s.startGoRoutine(func() {
|
||||
@@ -782,17 +791,18 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
|
||||
// Snapshot the current subscriptions from the sublist into our smap which
|
||||
// we will keep updated from now on.
|
||||
func (c *client) initLeafNodeSmap() {
|
||||
func (s *Server) initLeafNodeSmap(c *client) {
|
||||
acc := c.acc
|
||||
if acc == nil {
|
||||
c.Debugf("Leaf node does not have an account bound")
|
||||
return
|
||||
}
|
||||
// Collect all subs here.
|
||||
// Collect all account subs here.
|
||||
_subs := [32]*subscription{}
|
||||
subs := _subs[:0]
|
||||
ims := []string{}
|
||||
acc.mu.RLock()
|
||||
accName := acc.Name
|
||||
acc.sl.All(&subs)
|
||||
// Since leaf nodes only send on interest, if the bound
|
||||
// account has import services we need to send those over.
|
||||
@@ -801,6 +811,17 @@ func (c *client) initLeafNodeSmap() {
|
||||
}
|
||||
acc.mu.RUnlock()
|
||||
|
||||
// Now check for gateway interest. Leafnodes will put this into
|
||||
// the proper mode to propagate, but they are not held in the account.
|
||||
gwsa := [16]*client{}
|
||||
gws := gwsa[:0]
|
||||
s.getOutboundGatewayConnections(&gws)
|
||||
for _, cgw := range gws {
|
||||
if ei, _ := cgw.gw.outsim.Load(accName); ei != nil {
|
||||
ei.(*outsie).sl.All(&subs)
|
||||
}
|
||||
}
|
||||
|
||||
// Now walk the results and add them to our smap
|
||||
c.mu.Lock()
|
||||
for _, sub := range subs {
|
||||
@@ -900,23 +921,21 @@ func keyFromSub(sub *subscription) string {
|
||||
// Send all subscriptions for this account that include local
|
||||
// and all subscriptions besides our own.
|
||||
func (c *client) sendAllAccountSubs() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Hold all at once for now.
|
||||
var b bytes.Buffer
|
||||
|
||||
c.mu.Lock()
|
||||
for key, n := range c.leaf.smap {
|
||||
c.writeLeafSub(&b, key, n)
|
||||
}
|
||||
|
||||
// We will make sure we don't overflow here due to an max_pending.
|
||||
chunks := protoChunks(b.Bytes(), MAX_PAYLOAD_SIZE)
|
||||
|
||||
for _, chunk := range chunks {
|
||||
c.queueOutbound(chunk)
|
||||
c.flushOutbound()
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
|
||||
@@ -1075,13 +1094,7 @@ func (c *client) processLeafUnsub(arg []byte) error {
|
||||
updateGWs = srv.gateway.enabled
|
||||
}
|
||||
|
||||
// Treat leaf node subscriptions similar to a client subscription, meaning we
|
||||
// send them to both routes and gateways and other leaf nodes. We also do
|
||||
// the shadow subscriptions.
|
||||
if err := c.addShadowSubscriptions(acc, sub); err != nil {
|
||||
c.Errorf(err.Error())
|
||||
}
|
||||
// If we are routing add to the route map for the associated account.
|
||||
// If we are routing subtract from the route map for the associated account.
|
||||
srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
// Gateways
|
||||
if updateGWs {
|
||||
|
||||
@@ -939,6 +939,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) {
|
||||
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
|
||||
v.MaxPending = opts.MaxPending
|
||||
v.WriteDeadline = opts.WriteDeadline
|
||||
// FIXME(dlc) - make this multi-account aware.
|
||||
v.Subscriptions = s.gacc.sl.Count()
|
||||
v.ConfigLoadTime = s.configTime
|
||||
// Need a copy here since s.httpReqStats can change while doing
|
||||
@@ -1045,6 +1046,8 @@ func (reason ClosedState) String() string {
|
||||
return "Authentication Expired"
|
||||
case WrongGateway:
|
||||
return "Wrong Gateway"
|
||||
case MissingAccount:
|
||||
return "Missing Account"
|
||||
}
|
||||
return "Unknown State"
|
||||
}
|
||||
|
||||
@@ -227,6 +227,7 @@ func (o *Options) Clone() *Options {
|
||||
clone.Gateway.Gateways[i] = g.clone()
|
||||
}
|
||||
}
|
||||
// FIXME(dlc) - clone leaf node stuff.
|
||||
return clone
|
||||
}
|
||||
|
||||
|
||||
@@ -975,7 +975,7 @@ func (s *Server) Start() {
|
||||
<-ch
|
||||
}
|
||||
|
||||
// Solict remote servers for leaf node connections.
|
||||
// Solicit remote servers for leaf node connections.
|
||||
if len(opts.LeafNode.Remotes) > 0 {
|
||||
s.solicitLeafNodeRemotes(opts.LeafNode.Remotes)
|
||||
}
|
||||
|
||||
@@ -654,6 +654,51 @@ func TestLeafNodeGatewaySendsSystemEvent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeGatewayInterestPropagation(t *testing.T) {
|
||||
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
|
||||
ca := createClusterWithName(t, "A", 3)
|
||||
defer shutdownCluster(ca)
|
||||
cb := createClusterWithName(t, "B", 3, ca)
|
||||
defer shutdownCluster(cb)
|
||||
|
||||
sl1, sl1Opts := runSolicitLeafServer(ca.opts[1])
|
||||
defer sl1.Shutdown()
|
||||
|
||||
c := createClientConn(t, sl1Opts.Host, sl1Opts.Port)
|
||||
defer c.Close()
|
||||
|
||||
send, expect := setupConn(t, c)
|
||||
send("SUB foo 1\r\n")
|
||||
send("PING\r\n")
|
||||
expect(pongRe)
|
||||
|
||||
// Now we will create a new leaf node on cluster B, expect to get the
|
||||
// interest for "foo".
|
||||
opts := cb.opts[0]
|
||||
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc.Close()
|
||||
_, leafExpect := setupConn(t, lc)
|
||||
buf := leafExpect(lsubRe)
|
||||
if !strings.Contains(string(buf), "foo") {
|
||||
t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", buf)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeAuthSystemEventNoCrash(t *testing.T) {
|
||||
ca := createClusterWithName(t, "A", 1)
|
||||
defer shutdownCluster(ca)
|
||||
|
||||
opts := ca.opts[0]
|
||||
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc.Close()
|
||||
|
||||
leafSend := sendCommand(t, lc)
|
||||
leafSend("LS+ foo\r\n")
|
||||
checkInfoMsg(t, lc)
|
||||
}
|
||||
|
||||
func TestLeafNodeWithRouteAndGateway(t *testing.T) {
|
||||
server.SetGatewaysSolicitDelay(50 * time.Millisecond)
|
||||
defer server.ResetGatewaysSolicitDelay()
|
||||
@@ -702,7 +747,6 @@ func TestLeafNodeWithRouteAndGateway(t *testing.T) {
|
||||
expect(pongRe)
|
||||
leafExpect(lsubRe)
|
||||
|
||||
//leafSend("LMSG foo + myreply bar 2\r\nOK\r\n")
|
||||
leafSend("LMSG foo 2\r\nOK\r\n")
|
||||
expectNothing(t, lc)
|
||||
|
||||
@@ -1641,7 +1685,6 @@ func TestLeafNodeConnectionLimitsSingleServer(t *testing.T) {
|
||||
defer s4.Shutdown()
|
||||
|
||||
if nln := acc.NumLeafNodes(); nln != 2 {
|
||||
fmt.Printf("Acc is %q\n", acc.Name)
|
||||
t.Fatalf("Expected 2 leaf nodes, got %d", nln)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user