diff --git a/server/client.go b/server/client.go index 83435241..bb93d71d 100644 --- a/server/client.go +++ b/server/client.go @@ -2953,6 +2953,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g if client.kind == LEAF && client.perms != nil { if !client.pubAllowed(string(subject)) { client.mu.Unlock() + client.Debugf("Not permitted to publish to %q", subject) return false } } diff --git a/server/leafnode.go b/server/leafnode.go index b5ead9e4..ef63c0fc 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -472,17 +472,18 @@ func (s *Server) startLeafNodeAcceptLoop() { tlsRequired := opts.LeafNode.TLSConfig != nil tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert info := Info{ - ID: s.info.ID, - Name: s.info.Name, - Version: s.info.Version, - GitCommit: gitCommit, - GoVersion: runtime.Version(), - AuthRequired: true, - TLSRequired: tlsRequired, - TLSVerify: tlsVerify, - MaxPayload: s.info.MaxPayload, // TODO(dlc) - Allow override? - Headers: s.supportsHeaders(), - Proto: 1, // Fixed for now. + ID: s.info.ID, + Name: s.info.Name, + Version: s.info.Version, + GitCommit: gitCommit, + GoVersion: runtime.Version(), + AuthRequired: true, + TLSRequired: tlsRequired, + TLSVerify: tlsVerify, + MaxPayload: s.info.MaxPayload, // TODO(dlc) - Allow override? + Headers: s.supportsHeaders(), + Proto: 1, // Fixed for now. + InfoOnConnect: true, } // If we have selected a random port... if port == 0 { @@ -902,7 +903,7 @@ func (c *client) processLeafnodeInfo(info *Info) { c.setPermissions(perms) } - var finishConnect bool + var resumeConnect bool var s *Server // If this is a remote connection and this is the first INFO protocol, @@ -910,12 +911,19 @@ func (c *client) processLeafnodeInfo(info *Info) { if firstINFO && c.leaf.remote != nil { // Clear deadline that was set in createLeafNode while waiting for the INFO. c.nc.SetDeadline(time.Time{}) - finishConnect = true - s = c.srv + resumeConnect = true } + s = c.srv c.mu.Unlock() - if finishConnect && s != nil { + finishConnect := info.ConnectInfo && !firstINFO + if resumeConnect && s != nil { + s.leafNodeResumeConnectProcess(c) + if !info.InfoOnConnect { + finishConnect = true + } + } + if finishConnect { s.leafNodeFinishConnectProcess(c) } } @@ -1177,15 +1185,13 @@ func (c *client) remoteCluster() string { // Sends back an info block to the soliciting leafnode to let it know about // its permission settings for local enforcement. func (s *Server) sendPermsInfo(c *client) { - if c.perms == nil { - return - } // Copy info := s.copyLeafNodeInfo() c.mu.Lock() info.CID = c.cid info.Import = c.opts.Import info.Export = c.opts.Export + info.ConnectInfo = true b, _ := json.Marshal(info) pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} c.enqueueProto(bytes.Join(pcs, []byte(" "))) @@ -1207,6 +1213,7 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { ims := []string{} acc.mu.Lock() accName := acc.Name + accNTag := acc.nameTag // If we are solicited we only send interest for local clients. if c.isSpokeLeafNode() { acc.sl.localSubs(&subs) @@ -1220,6 +1227,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { // Since leaf nodes only send on interest, if the bound // account has import services we need to send those over. for isubj := range acc.imports.services { + if !c.canSubscribe(isubj) { + c.Debugf("Not permitted to import service %s on behalf of %s/%s", isubj, accName, accNTag) + continue + } ims = append(ims, isubj) } // Create a unique subject that will be used for loop detection. @@ -1260,6 +1271,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { rc := c.leaf.remoteCluster c.leaf.smap = make(map[string]int32) for _, sub := range subs { + if !c.canSubscribe(string(sub.subject)) { + c.Debugf("Not permitted to subscribe to %s on behalf of %s/%s", string(sub.subject), accName, accNTag) + continue + } // We ignore ourselves here. // Also don't add the subscription if it has a origin cluster and the // cluster name matches the one of the client we are sending to. @@ -1348,6 +1363,8 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) { // Check to make sure this sub does not have an origin cluster than matches the leafnode. ln.mu.Lock() skip := sub.origin != nil && string(sub.origin) == ln.remoteCluster() + // do not skip on !ln.canSubscribe(string(sub.subject)) + // Given allow:foo, > would be rejected. For leaf nodes filtering is done on the (soliciting) end ( ln.mu.Unlock() if skip { continue @@ -2108,9 +2125,8 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot } // This is invoked for remote LEAF remote connections after processing the INFO -// protocol. This will do the TLS handshake (if needed be), send the CONNECT protocol -// and register the leaf node. -func (s *Server) leafNodeFinishConnectProcess(c *client) { +// protocol. This will do the TLS handshake (if needed be) +func (s *Server) leafNodeResumeConnectProcess(c *client) { clusterName := s.ClusterName() c.mu.Lock() @@ -2120,11 +2136,6 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) { } remote := c.leaf.remote - // Check if we will need to send the system connect event. - remote.RLock() - sendSysConnectEvent := remote.Hub - remote.RUnlock() - var tlsRequired bool // In case of websocket, the TLS handshake has been already done. @@ -2164,8 +2175,44 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) { // Spin up the write loop. s.startGoRoutine(func() { c.writeLoop() }) + cid := c.cid + c.mu.Unlock() c.Debugf("Remote leafnode connect msg sent") + // timeout leafNodeFinishConnectProcess + time.AfterFunc(s.getOpts().PingInterval, func() { + s.mu.Lock() + // check if addLeafNodeConnection was called by leafNodeFinishConnectProcess + _, found := s.leafs[cid] + s.mu.Unlock() + if !found { + c.mu.Lock() + closed := c.isClosed() + c.mu.Unlock() + if !closed { + c.sendErrAndDebug("Stale Leaf Node Connection - Closing") + c.closeConnection(StaleConnection) + } + } + }) +} + +// This is invoked for remote LEAF remote connections after processing the INFO +// protocol and leafNodeResumeConnectProcess. +// This will send LS+ the CONNECT protocol and register the leaf node. +func (s *Server) leafNodeFinishConnectProcess(c *client) { + c.mu.Lock() + if c.isClosed() { + c.mu.Unlock() + s.removeLeafNodeConnection(c) + return + } + remote := c.leaf.remote + // Check if we will need to send the system connect event. + remote.RLock() + sendSysConnectEvent := remote.Hub + remote.RUnlock() + // Capture account before releasing lock acc := c.acc c.mu.Unlock() diff --git a/server/server.go b/server/server.go index 63688f28..c957297c 100644 --- a/server/server.go +++ b/server/server.go @@ -83,9 +83,11 @@ type Info struct { LameDuckMode bool `json:"ldm,omitempty"` // Route Specific - Import *SubjectPermission `json:"import,omitempty"` - Export *SubjectPermission `json:"export,omitempty"` - LNOC bool `json:"lnoc,omitempty"` + Import *SubjectPermission `json:"import,omitempty"` + Export *SubjectPermission `json:"export,omitempty"` + LNOC bool `json:"lnoc,omitempty"` + InfoOnConnect bool `json:"info_on_connect,omitempty"` // When true the server will respond connect to with an INFO + ConnectInfo bool `json:"connect_info,omitempty"` // When true this is the servers response to CONNECT // Gateways Specific Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO) diff --git a/test/gateway_test.go b/test/gateway_test.go index 9713bec9..efef6065 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -62,14 +62,22 @@ func setupGatewayConn(t testing.TB, c net.Conn, org, dst string) (sendFun, expec return sendCommand(t, c), expectCommand(t, c) } -func expectNumberOfProtos(t *testing.T, expFn expectFun, proto *regexp.Regexp, expected int) { +func expectNumberOfProtos(t *testing.T, expFn expectFun, proto *regexp.Regexp, expected int, ignore ...*regexp.Regexp) { t.Helper() + buf := []byte(nil) for count := 0; count != expected; { - buf := expFn(proto) + buf = append(buf, expFn(anyRe)...) + for _, skip := range ignore { + buf = skip.ReplaceAll(buf, []byte(``)) + } count += len(proto.FindAllSubmatch(buf, -1)) if count > expected { t.Fatalf("Expected %v matches, got %v", expected, count) } + buf = proto.ReplaceAll(buf, []byte(``)) + } + if len(buf) != 0 { + t.Fatalf("did not consume everything, left with: %q", buf) } } diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 222c1599..7511698e 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -200,7 +200,7 @@ func setupLeaf(t *testing.T, lc net.Conn, expectedSubs int) (sendFun, expectFun) send, expect := setupConn(t, lc) // A loop detection subscription is sent, so consume this here, along // with the ones that caller expect on setup. - expectNumberOfProtos(t, expect, lsubRe, expectedSubs) + expectNumberOfProtos(t, expect, lsubRe, expectedSubs, infoRe, pingRe) return send, expect } @@ -871,17 +871,28 @@ func TestLeafNodeGatewayInterestPropagation(t *testing.T) { lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() _, leafExpect := setupConn(t, lc) - var totalBuf []byte + buf := leafExpect(infoRe) + buf = infoRe.ReplaceAll(buf, []byte(nil)) + foundFoo := false for count := 0; count != 5; { - buf := leafExpect(lsubRe) - totalBuf = append(totalBuf, buf...) + // skip first time if we still have data (buf from above may already have some left) + if count != 0 || len(buf) == 0 { + buf = append(buf, leafExpect(anyRe)...) + } count += len(lsubRe.FindAllSubmatch(buf, -1)) if count > 5 { - t.Fatalf("Expected %v matches, got %v (buf=%s)", 4, count, totalBuf) + t.Fatalf("Expected %v matches, got %v (buf=%s)", 4, count, buf) } + if strings.Contains(string(buf), "foo") { + foundFoo = true + } + buf = lsubRe.ReplaceAll(buf, []byte(nil)) } - if !strings.Contains(string(totalBuf), "foo") { - t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", totalBuf) + if len(buf) != 0 { + t.Fatalf("did not consume everything, left with: %q", buf) + } + if !foundFoo { + t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", buf) } } @@ -1156,6 +1167,7 @@ func TestLeafNodeBasicAuth(t *testing.T) { lc = createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() leafSend, leafExpect := setupConnWithUserPass(t, lc, "derek", "s3cr3t!") + leafExpect(infoRe) leafExpect(lsubRe) leafSend("PING\r\n") leafExpect(pongRe) @@ -1422,6 +1434,8 @@ func TestLeafNodeUserPermsForConnection(t *testing.T) { nuc.Permissions.Pub.Allow.Add("foo.>") nuc.Permissions.Pub.Allow.Add("baz.>") nuc.Permissions.Sub.Allow.Add("foo.>") + // we would be immediately disconnected if that would not work + nuc.Permissions.Sub.Deny.Add("$SYS.>") ujwt, err := nuc.Encode(akp) if err != nil { t.Fatalf("Error generating user JWT: %v", err) diff --git a/test/test.go b/test/test.go index c3512826..a872b04b 100644 --- a/test/test.go +++ b/test/test.go @@ -271,7 +271,7 @@ func setupConnWithUserPass(t tLogger, c net.Conn, username, password string) (se cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"tls_required\":%v,\"protocol\":1,\"user\":%q,\"pass\":%q}\r\n", false, false, false, username, password) sendProto(t, c, cs) - return sendCommand(t, c), expectCommand(t, c) + return sendCommand(t, c), expectLefMostCommand(t, c) } type sendFun func(string) @@ -291,6 +291,14 @@ func expectCommand(t tLogger, c net.Conn) expectFun { } } +// Closure version for easier reading +func expectLefMostCommand(t tLogger, c net.Conn) expectFun { + var buf []byte + return func(re *regexp.Regexp) []byte { + return expectLeftMostResult(t, c, re, &buf) + } +} + // Send the protocol command to the server. func sendProto(t tLogger, c net.Conn, op string) { n, err := c.Write([]byte(op)) @@ -303,6 +311,7 @@ func sendProto(t tLogger, c net.Conn, op string) { } var ( + anyRe = regexp.MustCompile(`.*`) infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`) pingRe = regexp.MustCompile(`^PING\r\n`) pongRe = regexp.MustCompile(`^PONG\r\n`) @@ -341,6 +350,41 @@ const ( replyAndQueueIndex = 3 ) +// Test result from server against regexp and return left most match +func expectLeftMostResult(t tLogger, c net.Conn, re *regexp.Regexp, buf *[]byte) []byte { + recv := func() []byte { + expBuf := make([]byte, 32768) + // Wait for commands to be processed and results queued for read + c.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, err := c.Read(expBuf) + c.SetReadDeadline(time.Time{}) + + if n <= 0 && err != nil { + stackFatalf(t, "Error reading from conn: %v\n", err) + } + return expBuf[:n] + } + if len(*buf) == 0 { + *buf = recv() + } + emptyCnt := 0 + for { + result := re.Find(*buf) + if result == nil { + emptyCnt++ + if emptyCnt > 5 { + stackFatalf(t, "Reading empty data too often\n") + } + *buf = append(*buf, recv()...) + } else { + emptyCnt = 0 + cutIdx := strings.Index(string(*buf), string(result)) + len(result) + *buf = (*buf)[cutIdx:] + return result + } + } +} + // Test result from server against regexp func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte { expBuf := make([]byte, 32768)