From 5292ec1598daa9310cfdeed4bf3be470cd0e88bb Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 2 May 2019 14:22:51 -0700 Subject: [PATCH] Various fixes, init smap for leafnodes with gateways too Signed-off-by: Derek Collison --- server/auth.go | 3 +++ server/client.go | 4 ++-- server/events.go | 14 ++++++++++--- server/leafnode.go | 49 +++++++++++++++++++++++++++---------------- server/monitor.go | 3 +++ server/opts.go | 1 + server/server.go | 2 +- test/leafnode_test.go | 47 +++++++++++++++++++++++++++++++++++++++-- 8 files changed, 97 insertions(+), 26 deletions(-) diff --git a/server/auth.go b/server/auth.go index 087974e5..1cbca4af 100644 --- a/server/auth.go +++ b/server/auth.go @@ -643,6 +643,9 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool { return true } + // For now this means we are binding the leafnode to the global account. + c.registerWithAccount(s.globalAccount()) + // Snapshot server options. opts := s.getOpts() diff --git a/server/client.go b/server/client.go index 003e43ac..3828a30c 100644 --- a/server/client.go +++ b/server/client.go @@ -141,6 +141,7 @@ const ( ServerShutdown AuthenticationExpired WrongGateway + MissingAccount ) // Some flags passed to processMsgResultsEx @@ -549,9 +550,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) error { } c.mu.Lock() - defer c.mu.Unlock() c.user = user - // Assign permissions. if user.Permissions == nil { // Reset perms to nil in case client previously had them. @@ -560,6 +559,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) error { } else { c.setPermissions(user.Permissions) } + c.mu.Unlock() return nil } diff --git a/server/events.go b/server/events.go index ba9d714f..cfa0271a 100644 --- a/server/events.go +++ b/server/events.go @@ -773,7 +773,7 @@ func (s *Server) accountConnectEvent(c *client) { Start: c.start, Host: c.host, ID: c.cid, - Account: c.acc.Name, + Account: accForClient(c), User: nameForClient(c), Name: c.opts.Name, Lang: c.opts.Lang, @@ -812,7 +812,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string) Stop: &now, Host: c.host, ID: c.cid, - Account: c.acc.Name, + Account: accForClient(c), User: nameForClient(c), Name: c.opts.Name, Lang: c.opts.Lang, @@ -853,7 +853,7 @@ func (s *Server) sendAuthErrorEvent(c *client) { Stop: &now, Host: c.host, ID: c.cid, - Account: c.acc.Name, + Account: accForClient(c), User: nameForClient(c), Name: c.opts.Name, Lang: c.opts.Lang, @@ -941,6 +941,14 @@ func nameForClient(c *client) string { return "N/A" } +// Helper to grab account name for a client. +func accForClient(c *client) string { + if c.acc != nil { + return c.acc.Name + } + return "N/A" +} + // Helper to clear timers. func clearTimer(tp **time.Timer) { if t := *tp; t != nil { diff --git a/server/leafnode.go b/server/leafnode.go index 09f41e2b..710f6a05 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -73,7 +73,7 @@ func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool { return false } -// Ensure that gateway is properly configured. +// Ensure that leafnode is properly configured. func validateLeafNode(o *Options) error { if o.LeafNode.Port == 0 { return nil @@ -443,9 +443,13 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { remote.LocalAccount = globalAccountName } // FIXME(dlc) - Make this resolve at startup. - c.acc, _ = s.LookupAccount(remote.LocalAccount) - // Make sure we register with the account here. - c.registerWithAccount(c.acc) + acc, err := s.LookupAccount(remote.LocalAccount) + if err != nil { + c.Debugf("Can not locate local account %q for leafnode", remote.LocalAccount) + c.closeConnection(MissingAccount) + return nil + } + c.acc = acc c.leaf.remote = remote } @@ -541,6 +545,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { c.sendLeafConnect(tlsRequired) c.Debugf("Remote leaf node connect msg sent") + } else { // Send our info to the other side. // Remember the nonce we sent here for signatures, etc. @@ -600,8 +605,12 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { c.mu.Unlock() // Update server's accounting here if we solicited. + // Also send our local subs. if solicited { + // Make sure we register with the account here. + c.registerWithAccount(c.acc) s.addLeafNodeConnection(c) + c.sendAllAccountSubs() } return c @@ -762,7 +771,7 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro c.opts.Pedantic = false // Create and initialize the smap since we know our bound account now. - c.initLeafNodeSmap() + s.initLeafNodeSmap(c) // We are good to go, send over all the bound account subscriptions. s.startGoRoutine(func() { @@ -782,17 +791,18 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro // Snapshot the current subscriptions from the sublist into our smap which // we will keep updated from now on. -func (c *client) initLeafNodeSmap() { +func (s *Server) initLeafNodeSmap(c *client) { acc := c.acc if acc == nil { c.Debugf("Leaf node does not have an account bound") return } - // Collect all subs here. + // Collect all account subs here. _subs := [32]*subscription{} subs := _subs[:0] ims := []string{} acc.mu.RLock() + accName := acc.Name acc.sl.All(&subs) // Since leaf nodes only send on interest, if the bound // account has import services we need to send those over. @@ -801,6 +811,17 @@ func (c *client) initLeafNodeSmap() { } acc.mu.RUnlock() + // Now check for gateway interest. Leafnodes will put this into + // the proper mode to propagate, but they are not held in the account. + gwsa := [16]*client{} + gws := gwsa[:0] + s.getOutboundGatewayConnections(&gws) + for _, cgw := range gws { + if ei, _ := cgw.gw.outsim.Load(accName); ei != nil { + ei.(*outsie).sl.All(&subs) + } + } + // Now walk the results and add them to our smap c.mu.Lock() for _, sub := range subs { @@ -900,23 +921,21 @@ func keyFromSub(sub *subscription) string { // Send all subscriptions for this account that include local // and all subscriptions besides our own. func (c *client) sendAllAccountSubs() { - c.mu.Lock() - defer c.mu.Unlock() - // Hold all at once for now. var b bytes.Buffer + c.mu.Lock() for key, n := range c.leaf.smap { c.writeLeafSub(&b, key, n) } // We will make sure we don't overflow here due to an max_pending. chunks := protoChunks(b.Bytes(), MAX_PAYLOAD_SIZE) - for _, chunk := range chunks { c.queueOutbound(chunk) c.flushOutbound() } + c.mu.Unlock() } func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) { @@ -1075,13 +1094,7 @@ func (c *client) processLeafUnsub(arg []byte) error { updateGWs = srv.gateway.enabled } - // Treat leaf node subscriptions similar to a client subscription, meaning we - // send them to both routes and gateways and other leaf nodes. We also do - // the shadow subscriptions. - if err := c.addShadowSubscriptions(acc, sub); err != nil { - c.Errorf(err.Error()) - } - // If we are routing add to the route map for the associated account. + // If we are routing subtract from the route map for the associated account. srv.updateRouteSubscriptionMap(acc, sub, -1) // Gateways if updateGWs { diff --git a/server/monitor.go b/server/monitor.go index 55034b40..7e85de45 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -939,6 +939,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) { v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) v.MaxPending = opts.MaxPending v.WriteDeadline = opts.WriteDeadline + // FIXME(dlc) - make this multi-account aware. v.Subscriptions = s.gacc.sl.Count() v.ConfigLoadTime = s.configTime // Need a copy here since s.httpReqStats can change while doing @@ -1045,6 +1046,8 @@ func (reason ClosedState) String() string { return "Authentication Expired" case WrongGateway: return "Wrong Gateway" + case MissingAccount: + return "Missing Account" } return "Unknown State" } diff --git a/server/opts.go b/server/opts.go index 04a9de2e..4d34e6a0 100644 --- a/server/opts.go +++ b/server/opts.go @@ -227,6 +227,7 @@ func (o *Options) Clone() *Options { clone.Gateway.Gateways[i] = g.clone() } } + // FIXME(dlc) - clone leaf node stuff. return clone } diff --git a/server/server.go b/server/server.go index 2e79fd57..61a00376 100644 --- a/server/server.go +++ b/server/server.go @@ -975,7 +975,7 @@ func (s *Server) Start() { <-ch } - // Solict remote servers for leaf node connections. + // Solicit remote servers for leaf node connections. if len(opts.LeafNode.Remotes) > 0 { s.solicitLeafNodeRemotes(opts.LeafNode.Remotes) } diff --git a/test/leafnode_test.go b/test/leafnode_test.go index c3c23ed7..b771aef1 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -654,6 +654,51 @@ func TestLeafNodeGatewaySendsSystemEvent(t *testing.T) { } } +func TestLeafNodeGatewayInterestPropagation(t *testing.T) { + server.SetGatewaysSolicitDelay(10 * time.Millisecond) + defer server.ResetGatewaysSolicitDelay() + + ca := createClusterWithName(t, "A", 3) + defer shutdownCluster(ca) + cb := createClusterWithName(t, "B", 3, ca) + defer shutdownCluster(cb) + + sl1, sl1Opts := runSolicitLeafServer(ca.opts[1]) + defer sl1.Shutdown() + + c := createClientConn(t, sl1Opts.Host, sl1Opts.Port) + defer c.Close() + + send, expect := setupConn(t, c) + send("SUB foo 1\r\n") + send("PING\r\n") + expect(pongRe) + + // Now we will create a new leaf node on cluster B, expect to get the + // interest for "foo". + opts := cb.opts[0] + lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) + defer lc.Close() + _, leafExpect := setupConn(t, lc) + buf := leafExpect(lsubRe) + if !strings.Contains(string(buf), "foo") { + t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", buf) + } +} + +func TestLeafNodeAuthSystemEventNoCrash(t *testing.T) { + ca := createClusterWithName(t, "A", 1) + defer shutdownCluster(ca) + + opts := ca.opts[0] + lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) + defer lc.Close() + + leafSend := sendCommand(t, lc) + leafSend("LS+ foo\r\n") + checkInfoMsg(t, lc) +} + func TestLeafNodeWithRouteAndGateway(t *testing.T) { server.SetGatewaysSolicitDelay(50 * time.Millisecond) defer server.ResetGatewaysSolicitDelay() @@ -702,7 +747,6 @@ func TestLeafNodeWithRouteAndGateway(t *testing.T) { expect(pongRe) leafExpect(lsubRe) - //leafSend("LMSG foo + myreply bar 2\r\nOK\r\n") leafSend("LMSG foo 2\r\nOK\r\n") expectNothing(t, lc) @@ -1641,7 +1685,6 @@ func TestLeafNodeConnectionLimitsSingleServer(t *testing.T) { defer s4.Shutdown() if nln := acc.NumLeafNodes(); nln != 2 { - fmt.Printf("Acc is %q\n", acc.Name) t.Fatalf("Expected 2 leaf nodes, got %d", nln) }