mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[Fixed] leaf node subscription permission negotiation.
On connect all subscription where sent by the soliciting leaf node. If creds contains sub deny permissions, the leaf node would be disconnected. This waits for the permissions to be exchanged and checks permissions before sending subscriptions. Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
committed by
Matthias Hanel
parent
112257d09d
commit
5d1f36dd17
@@ -2953,6 +2953,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g
|
||||
if client.kind == LEAF && client.perms != nil {
|
||||
if !client.pubAllowed(string(subject)) {
|
||||
client.mu.Unlock()
|
||||
client.Debugf("Not permitted to publish to %q", subject)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -472,17 +472,18 @@ func (s *Server) startLeafNodeAcceptLoop() {
|
||||
tlsRequired := opts.LeafNode.TLSConfig != nil
|
||||
tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
|
||||
info := Info{
|
||||
ID: s.info.ID,
|
||||
Name: s.info.Name,
|
||||
Version: s.info.Version,
|
||||
GitCommit: gitCommit,
|
||||
GoVersion: runtime.Version(),
|
||||
AuthRequired: true,
|
||||
TLSRequired: tlsRequired,
|
||||
TLSVerify: tlsVerify,
|
||||
MaxPayload: s.info.MaxPayload, // TODO(dlc) - Allow override?
|
||||
Headers: s.supportsHeaders(),
|
||||
Proto: 1, // Fixed for now.
|
||||
ID: s.info.ID,
|
||||
Name: s.info.Name,
|
||||
Version: s.info.Version,
|
||||
GitCommit: gitCommit,
|
||||
GoVersion: runtime.Version(),
|
||||
AuthRequired: true,
|
||||
TLSRequired: tlsRequired,
|
||||
TLSVerify: tlsVerify,
|
||||
MaxPayload: s.info.MaxPayload, // TODO(dlc) - Allow override?
|
||||
Headers: s.supportsHeaders(),
|
||||
Proto: 1, // Fixed for now.
|
||||
InfoOnConnect: true,
|
||||
}
|
||||
// If we have selected a random port...
|
||||
if port == 0 {
|
||||
@@ -902,7 +903,7 @@ func (c *client) processLeafnodeInfo(info *Info) {
|
||||
c.setPermissions(perms)
|
||||
}
|
||||
|
||||
var finishConnect bool
|
||||
var resumeConnect bool
|
||||
var s *Server
|
||||
|
||||
// If this is a remote connection and this is the first INFO protocol,
|
||||
@@ -910,12 +911,19 @@ func (c *client) processLeafnodeInfo(info *Info) {
|
||||
if firstINFO && c.leaf.remote != nil {
|
||||
// Clear deadline that was set in createLeafNode while waiting for the INFO.
|
||||
c.nc.SetDeadline(time.Time{})
|
||||
finishConnect = true
|
||||
s = c.srv
|
||||
resumeConnect = true
|
||||
}
|
||||
s = c.srv
|
||||
c.mu.Unlock()
|
||||
|
||||
if finishConnect && s != nil {
|
||||
finishConnect := info.ConnectInfo && !firstINFO
|
||||
if resumeConnect && s != nil {
|
||||
s.leafNodeResumeConnectProcess(c)
|
||||
if !info.InfoOnConnect {
|
||||
finishConnect = true
|
||||
}
|
||||
}
|
||||
if finishConnect {
|
||||
s.leafNodeFinishConnectProcess(c)
|
||||
}
|
||||
}
|
||||
@@ -1177,15 +1185,13 @@ func (c *client) remoteCluster() string {
|
||||
// Sends back an info block to the soliciting leafnode to let it know about
|
||||
// its permission settings for local enforcement.
|
||||
func (s *Server) sendPermsInfo(c *client) {
|
||||
if c.perms == nil {
|
||||
return
|
||||
}
|
||||
// Copy
|
||||
info := s.copyLeafNodeInfo()
|
||||
c.mu.Lock()
|
||||
info.CID = c.cid
|
||||
info.Import = c.opts.Import
|
||||
info.Export = c.opts.Export
|
||||
info.ConnectInfo = true
|
||||
b, _ := json.Marshal(info)
|
||||
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
|
||||
c.enqueueProto(bytes.Join(pcs, []byte(" ")))
|
||||
@@ -1207,6 +1213,7 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
|
||||
ims := []string{}
|
||||
acc.mu.Lock()
|
||||
accName := acc.Name
|
||||
accNTag := acc.nameTag
|
||||
// If we are solicited we only send interest for local clients.
|
||||
if c.isSpokeLeafNode() {
|
||||
acc.sl.localSubs(&subs)
|
||||
@@ -1220,6 +1227,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
|
||||
// Since leaf nodes only send on interest, if the bound
|
||||
// account has import services we need to send those over.
|
||||
for isubj := range acc.imports.services {
|
||||
if !c.canSubscribe(isubj) {
|
||||
c.Debugf("Not permitted to import service %s on behalf of %s/%s", isubj, accName, accNTag)
|
||||
continue
|
||||
}
|
||||
ims = append(ims, isubj)
|
||||
}
|
||||
// Create a unique subject that will be used for loop detection.
|
||||
@@ -1260,6 +1271,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
|
||||
rc := c.leaf.remoteCluster
|
||||
c.leaf.smap = make(map[string]int32)
|
||||
for _, sub := range subs {
|
||||
if !c.canSubscribe(string(sub.subject)) {
|
||||
c.Debugf("Not permitted to subscribe to %s on behalf of %s/%s", string(sub.subject), accName, accNTag)
|
||||
continue
|
||||
}
|
||||
// We ignore ourselves here.
|
||||
// Also don't add the subscription if it has a origin cluster and the
|
||||
// cluster name matches the one of the client we are sending to.
|
||||
@@ -1348,6 +1363,8 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
|
||||
// Check to make sure this sub does not have an origin cluster than matches the leafnode.
|
||||
ln.mu.Lock()
|
||||
skip := sub.origin != nil && string(sub.origin) == ln.remoteCluster()
|
||||
// do not skip on !ln.canSubscribe(string(sub.subject))
|
||||
// Given allow:foo, > would be rejected. For leaf nodes filtering is done on the (soliciting) end (
|
||||
ln.mu.Unlock()
|
||||
if skip {
|
||||
continue
|
||||
@@ -2108,9 +2125,8 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
|
||||
}
|
||||
|
||||
// This is invoked for remote LEAF remote connections after processing the INFO
|
||||
// protocol. This will do the TLS handshake (if needed be), send the CONNECT protocol
|
||||
// and register the leaf node.
|
||||
func (s *Server) leafNodeFinishConnectProcess(c *client) {
|
||||
// protocol. This will do the TLS handshake (if needed be)
|
||||
func (s *Server) leafNodeResumeConnectProcess(c *client) {
|
||||
clusterName := s.ClusterName()
|
||||
|
||||
c.mu.Lock()
|
||||
@@ -2120,11 +2136,6 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) {
|
||||
}
|
||||
remote := c.leaf.remote
|
||||
|
||||
// Check if we will need to send the system connect event.
|
||||
remote.RLock()
|
||||
sendSysConnectEvent := remote.Hub
|
||||
remote.RUnlock()
|
||||
|
||||
var tlsRequired bool
|
||||
|
||||
// In case of websocket, the TLS handshake has been already done.
|
||||
@@ -2164,8 +2175,44 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) {
|
||||
// Spin up the write loop.
|
||||
s.startGoRoutine(func() { c.writeLoop() })
|
||||
|
||||
cid := c.cid
|
||||
c.mu.Unlock()
|
||||
c.Debugf("Remote leafnode connect msg sent")
|
||||
|
||||
// timeout leafNodeFinishConnectProcess
|
||||
time.AfterFunc(s.getOpts().PingInterval, func() {
|
||||
s.mu.Lock()
|
||||
// check if addLeafNodeConnection was called by leafNodeFinishConnectProcess
|
||||
_, found := s.leafs[cid]
|
||||
s.mu.Unlock()
|
||||
if !found {
|
||||
c.mu.Lock()
|
||||
closed := c.isClosed()
|
||||
c.mu.Unlock()
|
||||
if !closed {
|
||||
c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
|
||||
c.closeConnection(StaleConnection)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// This is invoked for remote LEAF remote connections after processing the INFO
|
||||
// protocol and leafNodeResumeConnectProcess.
|
||||
// This will send LS+ the CONNECT protocol and register the leaf node.
|
||||
func (s *Server) leafNodeFinishConnectProcess(c *client) {
|
||||
c.mu.Lock()
|
||||
if c.isClosed() {
|
||||
c.mu.Unlock()
|
||||
s.removeLeafNodeConnection(c)
|
||||
return
|
||||
}
|
||||
remote := c.leaf.remote
|
||||
// Check if we will need to send the system connect event.
|
||||
remote.RLock()
|
||||
sendSysConnectEvent := remote.Hub
|
||||
remote.RUnlock()
|
||||
|
||||
// Capture account before releasing lock
|
||||
acc := c.acc
|
||||
c.mu.Unlock()
|
||||
|
||||
@@ -83,9 +83,11 @@ type Info struct {
|
||||
LameDuckMode bool `json:"ldm,omitempty"`
|
||||
|
||||
// Route Specific
|
||||
Import *SubjectPermission `json:"import,omitempty"`
|
||||
Export *SubjectPermission `json:"export,omitempty"`
|
||||
LNOC bool `json:"lnoc,omitempty"`
|
||||
Import *SubjectPermission `json:"import,omitempty"`
|
||||
Export *SubjectPermission `json:"export,omitempty"`
|
||||
LNOC bool `json:"lnoc,omitempty"`
|
||||
InfoOnConnect bool `json:"info_on_connect,omitempty"` // When true the server will respond connect to with an INFO
|
||||
ConnectInfo bool `json:"connect_info,omitempty"` // When true this is the servers response to CONNECT
|
||||
|
||||
// Gateways Specific
|
||||
Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO)
|
||||
|
||||
@@ -62,14 +62,22 @@ func setupGatewayConn(t testing.TB, c net.Conn, org, dst string) (sendFun, expec
|
||||
return sendCommand(t, c), expectCommand(t, c)
|
||||
}
|
||||
|
||||
func expectNumberOfProtos(t *testing.T, expFn expectFun, proto *regexp.Regexp, expected int) {
|
||||
func expectNumberOfProtos(t *testing.T, expFn expectFun, proto *regexp.Regexp, expected int, ignore ...*regexp.Regexp) {
|
||||
t.Helper()
|
||||
buf := []byte(nil)
|
||||
for count := 0; count != expected; {
|
||||
buf := expFn(proto)
|
||||
buf = append(buf, expFn(anyRe)...)
|
||||
for _, skip := range ignore {
|
||||
buf = skip.ReplaceAll(buf, []byte(``))
|
||||
}
|
||||
count += len(proto.FindAllSubmatch(buf, -1))
|
||||
if count > expected {
|
||||
t.Fatalf("Expected %v matches, got %v", expected, count)
|
||||
}
|
||||
buf = proto.ReplaceAll(buf, []byte(``))
|
||||
}
|
||||
if len(buf) != 0 {
|
||||
t.Fatalf("did not consume everything, left with: %q", buf)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -200,7 +200,7 @@ func setupLeaf(t *testing.T, lc net.Conn, expectedSubs int) (sendFun, expectFun)
|
||||
send, expect := setupConn(t, lc)
|
||||
// A loop detection subscription is sent, so consume this here, along
|
||||
// with the ones that caller expect on setup.
|
||||
expectNumberOfProtos(t, expect, lsubRe, expectedSubs)
|
||||
expectNumberOfProtos(t, expect, lsubRe, expectedSubs, infoRe, pingRe)
|
||||
return send, expect
|
||||
}
|
||||
|
||||
@@ -871,17 +871,28 @@ func TestLeafNodeGatewayInterestPropagation(t *testing.T) {
|
||||
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc.Close()
|
||||
_, leafExpect := setupConn(t, lc)
|
||||
var totalBuf []byte
|
||||
buf := leafExpect(infoRe)
|
||||
buf = infoRe.ReplaceAll(buf, []byte(nil))
|
||||
foundFoo := false
|
||||
for count := 0; count != 5; {
|
||||
buf := leafExpect(lsubRe)
|
||||
totalBuf = append(totalBuf, buf...)
|
||||
// skip first time if we still have data (buf from above may already have some left)
|
||||
if count != 0 || len(buf) == 0 {
|
||||
buf = append(buf, leafExpect(anyRe)...)
|
||||
}
|
||||
count += len(lsubRe.FindAllSubmatch(buf, -1))
|
||||
if count > 5 {
|
||||
t.Fatalf("Expected %v matches, got %v (buf=%s)", 4, count, totalBuf)
|
||||
t.Fatalf("Expected %v matches, got %v (buf=%s)", 4, count, buf)
|
||||
}
|
||||
if strings.Contains(string(buf), "foo") {
|
||||
foundFoo = true
|
||||
}
|
||||
buf = lsubRe.ReplaceAll(buf, []byte(nil))
|
||||
}
|
||||
if !strings.Contains(string(totalBuf), "foo") {
|
||||
t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", totalBuf)
|
||||
if len(buf) != 0 {
|
||||
t.Fatalf("did not consume everything, left with: %q", buf)
|
||||
}
|
||||
if !foundFoo {
|
||||
t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", buf)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1156,6 +1167,7 @@ func TestLeafNodeBasicAuth(t *testing.T) {
|
||||
lc = createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc.Close()
|
||||
leafSend, leafExpect := setupConnWithUserPass(t, lc, "derek", "s3cr3t!")
|
||||
leafExpect(infoRe)
|
||||
leafExpect(lsubRe)
|
||||
leafSend("PING\r\n")
|
||||
leafExpect(pongRe)
|
||||
@@ -1422,6 +1434,8 @@ func TestLeafNodeUserPermsForConnection(t *testing.T) {
|
||||
nuc.Permissions.Pub.Allow.Add("foo.>")
|
||||
nuc.Permissions.Pub.Allow.Add("baz.>")
|
||||
nuc.Permissions.Sub.Allow.Add("foo.>")
|
||||
// we would be immediately disconnected if that would not work
|
||||
nuc.Permissions.Sub.Deny.Add("$SYS.>")
|
||||
ujwt, err := nuc.Encode(akp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
|
||||
46
test/test.go
46
test/test.go
@@ -271,7 +271,7 @@ func setupConnWithUserPass(t tLogger, c net.Conn, username, password string) (se
|
||||
cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"tls_required\":%v,\"protocol\":1,\"user\":%q,\"pass\":%q}\r\n",
|
||||
false, false, false, username, password)
|
||||
sendProto(t, c, cs)
|
||||
return sendCommand(t, c), expectCommand(t, c)
|
||||
return sendCommand(t, c), expectLefMostCommand(t, c)
|
||||
}
|
||||
|
||||
type sendFun func(string)
|
||||
@@ -291,6 +291,14 @@ func expectCommand(t tLogger, c net.Conn) expectFun {
|
||||
}
|
||||
}
|
||||
|
||||
// Closure version for easier reading
|
||||
func expectLefMostCommand(t tLogger, c net.Conn) expectFun {
|
||||
var buf []byte
|
||||
return func(re *regexp.Regexp) []byte {
|
||||
return expectLeftMostResult(t, c, re, &buf)
|
||||
}
|
||||
}
|
||||
|
||||
// Send the protocol command to the server.
|
||||
func sendProto(t tLogger, c net.Conn, op string) {
|
||||
n, err := c.Write([]byte(op))
|
||||
@@ -303,6 +311,7 @@ func sendProto(t tLogger, c net.Conn, op string) {
|
||||
}
|
||||
|
||||
var (
|
||||
anyRe = regexp.MustCompile(`.*`)
|
||||
infoRe = regexp.MustCompile(`INFO\s+([^\r\n]+)\r\n`)
|
||||
pingRe = regexp.MustCompile(`^PING\r\n`)
|
||||
pongRe = regexp.MustCompile(`^PONG\r\n`)
|
||||
@@ -341,6 +350,41 @@ const (
|
||||
replyAndQueueIndex = 3
|
||||
)
|
||||
|
||||
// Test result from server against regexp and return left most match
|
||||
func expectLeftMostResult(t tLogger, c net.Conn, re *regexp.Regexp, buf *[]byte) []byte {
|
||||
recv := func() []byte {
|
||||
expBuf := make([]byte, 32768)
|
||||
// Wait for commands to be processed and results queued for read
|
||||
c.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
n, err := c.Read(expBuf)
|
||||
c.SetReadDeadline(time.Time{})
|
||||
|
||||
if n <= 0 && err != nil {
|
||||
stackFatalf(t, "Error reading from conn: %v\n", err)
|
||||
}
|
||||
return expBuf[:n]
|
||||
}
|
||||
if len(*buf) == 0 {
|
||||
*buf = recv()
|
||||
}
|
||||
emptyCnt := 0
|
||||
for {
|
||||
result := re.Find(*buf)
|
||||
if result == nil {
|
||||
emptyCnt++
|
||||
if emptyCnt > 5 {
|
||||
stackFatalf(t, "Reading empty data too often\n")
|
||||
}
|
||||
*buf = append(*buf, recv()...)
|
||||
} else {
|
||||
emptyCnt = 0
|
||||
cutIdx := strings.Index(string(*buf), string(result)) + len(result)
|
||||
*buf = (*buf)[cutIdx:]
|
||||
return result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test result from server against regexp
|
||||
func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte {
|
||||
expBuf := make([]byte, 32768)
|
||||
|
||||
Reference in New Issue
Block a user