mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
@@ -2654,6 +2654,12 @@ func (c *client) canSubscribe(subject string) bool {
|
||||
if c.perms.sub.allow != nil {
|
||||
r := c.perms.sub.allow.Match(subject)
|
||||
allowed = len(r.psubs) != 0
|
||||
// Leafnodes operate slightly differently in that they allow broader scoped subjects.
|
||||
// They will prune based on publish perms before sending to a leafnode client.
|
||||
if !allowed && c.kind == LEAF && subjectHasWildcard(subject) {
|
||||
r := c.perms.sub.allow.ReverseMatch(subject)
|
||||
allowed = len(r.psubs) != 0
|
||||
}
|
||||
}
|
||||
// If we have a deny list and we think we are allowed, check that as well.
|
||||
if allowed && c.perms.sub.deny != nil {
|
||||
@@ -2667,8 +2673,7 @@ func (c *client) canSubscribe(subject string) bool {
|
||||
if allowed && c.mperms == nil && subjectHasWildcard(subject) {
|
||||
// Whip through the deny array and check if this wildcard subject is within scope.
|
||||
for _, sub := range c.darray {
|
||||
tokens := strings.Split(sub, tsep)
|
||||
if isSubsetMatch(tokens, sub) {
|
||||
if subjectIsSubsetMatch(sub, subject) {
|
||||
c.loadMsgDenyFilter()
|
||||
break
|
||||
}
|
||||
@@ -3056,23 +3061,9 @@ func (c *client) deliverMsg(sub *subscription, acc *Account, subject, reply, mh,
|
||||
|
||||
// Check if we are a leafnode and have perms to check.
|
||||
if client.kind == LEAF && client.perms != nil {
|
||||
if client.isSpokeLeafNode() {
|
||||
// `client` connection is considered a spoke, that is, it is a
|
||||
// "remote" to the other server. We check if it is allowed to
|
||||
// publish.
|
||||
if !client.pubAllowedFullCheck(string(subject), true, true) {
|
||||
client.mu.Unlock()
|
||||
client.Debugf("Not permitted to publish to %q", subject)
|
||||
return false
|
||||
}
|
||||
} else if !client.canSubscribe(string(subject)) {
|
||||
// `client` connection is considered the hub, that is, it accepted
|
||||
// the connection from the server that created a "remote" connection
|
||||
// to this server. Here, we want to check if the other side can
|
||||
// receive this message, so is it allowed to subscribe to this subject.
|
||||
|
||||
if !client.pubAllowedFullCheck(string(subject), true, true) {
|
||||
client.mu.Unlock()
|
||||
client.Debugf("Not permitted to subscribe to %q", subject)
|
||||
client.Debugf("Not permitted to deliver to %q", subject)
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -3394,14 +3385,11 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo
|
||||
if ok {
|
||||
return v.(bool)
|
||||
}
|
||||
var allowed bool
|
||||
allowed := true
|
||||
// Cache miss, check allow then deny as needed.
|
||||
if c.perms.pub.allow != nil {
|
||||
r := c.perms.pub.allow.Match(subject)
|
||||
allowed = len(r.psubs) != 0
|
||||
} else {
|
||||
// No entries means all are allowed. Deny will overrule as needed.
|
||||
allowed = true
|
||||
}
|
||||
// If we have a deny list and are currently allowed, check that as well.
|
||||
if allowed && c.perms.pub.deny != nil {
|
||||
|
||||
@@ -823,7 +823,6 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
|
||||
if remote != nil {
|
||||
// For now, if lookup fails, we will constantly try
|
||||
// to recreate this LN connection.
|
||||
|
||||
remote.Lock()
|
||||
// Users can bind to any local account, if its empty
|
||||
// we will assume the $G account.
|
||||
@@ -1368,6 +1367,18 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
c.leaf.remoteCluster = proto.Cluster
|
||||
}
|
||||
|
||||
// When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
|
||||
// behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
|
||||
if !c.isSolicitedLeafNode() && c.perms != nil {
|
||||
sp, pp := c.perms.sub, c.perms.pub
|
||||
c.perms.sub, c.perms.pub = pp, sp
|
||||
if c.opts.Import != nil {
|
||||
c.darray = c.opts.Import.Deny
|
||||
} else {
|
||||
c.darray = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check for JetStream domain
|
||||
jsConfigured := c.acc.jetStreamConfigured()
|
||||
doDomainMappings := opts.JetStreamDomain != _EMPTY_ && c.acc != sysAcc && jsConfigured
|
||||
@@ -1375,6 +1386,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
// If we have JS enabled and the other side does as well we need to add in an import deny clause.
|
||||
if jsConfigured && proto.JetStream {
|
||||
c.mergePubDenyPermissions([]string{jsAllAPI})
|
||||
// We need to send this back to the other side.
|
||||
if c.isHubLeafNode() {
|
||||
if c.opts.Import == nil {
|
||||
c.opts.Import = &SubjectPermission{}
|
||||
}
|
||||
c.opts.Import.Deny = append(c.opts.Import.Deny, jsAllAPI)
|
||||
}
|
||||
}
|
||||
|
||||
// Set the Ping timer
|
||||
@@ -1455,6 +1473,12 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
|
||||
acc.mu.Lock()
|
||||
accName := acc.Name
|
||||
accNTag := acc.nameTag
|
||||
|
||||
// To make printing look better when no friendly name present.
|
||||
if accNTag != _EMPTY_ {
|
||||
accNTag = "/" + accNTag
|
||||
}
|
||||
|
||||
// If we are solicited we only send interest for local clients.
|
||||
if c.isSpokeLeafNode() {
|
||||
acc.sl.localSubs(&subs)
|
||||
@@ -1468,8 +1492,8 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
|
||||
// Since leaf nodes only send on interest, if the bound
|
||||
// account has import services we need to send those over.
|
||||
for isubj := range acc.imports.services {
|
||||
if !c.canSubscribe(isubj) {
|
||||
c.Debugf("Not permitted to import service %s on behalf of %s/%s", isubj, accName, accNTag)
|
||||
if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
|
||||
c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
|
||||
continue
|
||||
}
|
||||
ims = append(ims, isubj)
|
||||
@@ -1517,8 +1541,9 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
|
||||
rc := c.leaf.remoteCluster
|
||||
c.leaf.smap = make(map[string]int32)
|
||||
for _, sub := range subs {
|
||||
if !c.canSubscribe(string(sub.subject)) {
|
||||
c.Debugf("Not permitted to subscribe to %s on behalf of %s/%s", string(sub.subject), accName, accNTag)
|
||||
subj := string(sub.subject)
|
||||
if c.isSpokeLeafNode() && !c.canSubscribe(subj) {
|
||||
c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", subj, accName, accNTag)
|
||||
continue
|
||||
}
|
||||
// We ignore ourselves here.
|
||||
@@ -1608,9 +1633,7 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
|
||||
for _, ln := range leafs {
|
||||
// Check to make sure this sub does not have an origin cluster than matches the leafnode.
|
||||
ln.mu.Lock()
|
||||
skip := sub.origin != nil && string(sub.origin) == ln.remoteCluster()
|
||||
// do not skip on !ln.canSubscribe(string(sub.subject))
|
||||
// Given allow:foo, > would be rejected. For leaf nodes filtering is done on the (soliciting) end.
|
||||
skip := (sub.origin != nil && string(sub.origin) == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject))
|
||||
ln.mu.Unlock()
|
||||
if skip {
|
||||
continue
|
||||
@@ -1724,7 +1747,7 @@ func keyFromSub(sub *subscription) string {
|
||||
|
||||
// Lock should be held.
|
||||
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
|
||||
if key == "" {
|
||||
if key == _EMPTY_ {
|
||||
return
|
||||
}
|
||||
if n > 0 {
|
||||
@@ -1807,7 +1830,9 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
checkPerms = false
|
||||
}
|
||||
}
|
||||
if checkPerms && c.isHubLeafNode() && !c.canSubscribe(string(sub.subject)) {
|
||||
|
||||
// If we are a hub check that we can publish to this subject.
|
||||
if checkPerms && subjectIsLiteral(string(sub.subject)) && !c.pubAllowedFullCheck(string(sub.subject), true, true) {
|
||||
c.mu.Unlock()
|
||||
c.leafSubPermViolation(sub.subject)
|
||||
return nil
|
||||
@@ -2449,7 +2474,7 @@ func (s *Server) leafNodeResumeConnectProcess(c *client) {
|
||||
c.Debugf("Remote leafnode connect msg sent")
|
||||
}
|
||||
|
||||
// This is invoked for remote LEAF remote connections after processing the INFO
|
||||
// This is invoked for remote LEAF connections after processing the INFO
|
||||
// protocol and leafNodeResumeConnectProcess.
|
||||
// This will send LS+ the CONNECT protocol and register the leaf node.
|
||||
func (s *Server) leafNodeFinishConnectProcess(c *client) {
|
||||
|
||||
@@ -1472,8 +1472,8 @@ func TestLeafNodeExportPermissionsNotForSpecialSubs(t *testing.T) {
|
||||
// The deny is totally restrictive, but make sure that we still accept the $LDS, $GR and _GR_ go from LN1.
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
// We should have registered the 3 subs from the accepting leafnode.
|
||||
if n := ln2.globalAccount().TotalSubs(); n != 7 {
|
||||
return fmt.Errorf("Expected %d subs, got %v", 7, n)
|
||||
if n := ln2.globalAccount().TotalSubs(); n != 5 {
|
||||
return fmt.Errorf("Expected %d subs, got %v", 5, n)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@@ -3930,6 +3930,7 @@ leafnodes:{
|
||||
defer ncL.Close()
|
||||
|
||||
test := func(subject string, cSub, cPub *nats.Conn, remoteServerForSub *Server, accName string, pass bool) {
|
||||
t.Helper()
|
||||
sub, err := cSub.SubscribeSync(subject)
|
||||
require_NoError(t, err)
|
||||
require_NoError(t, cSub.Flush())
|
||||
@@ -3961,6 +3962,6 @@ leafnodes:{
|
||||
test("pub", ncA, ncL, sL, "A", true)
|
||||
})
|
||||
t.Run("pub-on-ln-fail", func(t *testing.T) {
|
||||
test("pubdeny", ncA, ncL, sL, "A", false)
|
||||
test("pubdeny", ncA, ncL, nil, "A", false)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1488,10 +1488,10 @@ func (s *Sublist) ReverseMatch(subject string) *SublistResult {
|
||||
}
|
||||
|
||||
func reverseMatchLevel(l *level, toks []string, n *node, results *SublistResult) {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
for i, t := range toks {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
if len(t) == 1 {
|
||||
if t[0] == fwc {
|
||||
getAllNodes(l, results)
|
||||
@@ -1518,6 +1518,12 @@ func getAllNodes(l *level, results *SublistResult) {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
if l.pwc != nil {
|
||||
addNodeToResults(l.pwc, results)
|
||||
}
|
||||
if l.fwc != nil {
|
||||
addNodeToResults(l.fwc, results)
|
||||
}
|
||||
for _, n := range l.nodes {
|
||||
addNodeToResults(n, results)
|
||||
getAllNodes(n.next, results)
|
||||
|
||||
Reference in New Issue
Block a user