mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Properly handle leafnode spoke permissions.
When a leafnode would connect with credentials that had permissions the spoke did not have a way of knowing what those were. This could lead to being disconnected when sending subscriptions or messages to the hub which were not allowed. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -808,7 +808,7 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// We are here if we accept leafnode connections without any credential.
|
||||
// We are here if we accept leafnode connections without any credentials.
|
||||
|
||||
// Still, if the CONNECT has some user info, we will bind to the
|
||||
// user's account or to the specified default account (if provided)
|
||||
|
||||
@@ -440,7 +440,7 @@ type clientOpts struct {
|
||||
Headers bool `json:"headers,omitempty"`
|
||||
NoResponders bool `json:"no_responders,omitempty"`
|
||||
|
||||
// Routes only
|
||||
// Routes and Leafnodes only
|
||||
Import *SubjectPermission `json:"import,omitempty"`
|
||||
Export *SubjectPermission `json:"export,omitempty"`
|
||||
}
|
||||
@@ -748,6 +748,15 @@ func (c *client) setPermissions(perms *Permissions) {
|
||||
c.perms.sub.deny.Insert(sub)
|
||||
}
|
||||
}
|
||||
|
||||
// If we are a leafnode and we are the hub copy the extracted perms
|
||||
// to resend back to soliciting server. These are reversed from the
|
||||
// way routes interpret them since this is how the soliciting server
|
||||
// will receive these back in an update INFO.
|
||||
if c.isHubLeafNode() {
|
||||
c.opts.Import = perms.Subscribe
|
||||
c.opts.Export = perms.Publish
|
||||
}
|
||||
}
|
||||
|
||||
// Check to see if we have an expiration for the user JWT via base claims.
|
||||
@@ -2698,6 +2707,14 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply b
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if we are a spoke leafnode and have perms to check.
|
||||
if client.isSpokeLeafNode() && client.perms != nil {
|
||||
if !client.pubAllowed(string(subject)) {
|
||||
client.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
srv := client.srv
|
||||
|
||||
sub.nm++
|
||||
@@ -3803,7 +3820,7 @@ func (c *client) typeString() string {
|
||||
case GATEWAY:
|
||||
return "Gateway"
|
||||
case LEAF:
|
||||
return "LeafNode"
|
||||
return "Leafnode"
|
||||
case JETSTREAM:
|
||||
return "JetStream"
|
||||
case ACCOUNT:
|
||||
|
||||
@@ -907,6 +907,14 @@ func (c *client) processLeafnodeInfo(info *Info) error {
|
||||
c.updateLeafNodeURLs(info)
|
||||
}
|
||||
|
||||
// Check to see if we have permissions updates here.
|
||||
if info.Import != nil || info.Export != nil {
|
||||
c.setPermissions(&Permissions{
|
||||
Publish: info.Export,
|
||||
Subscribe: info.Import,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1057,6 +1065,10 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
c.leaf.isSpoke = true
|
||||
}
|
||||
|
||||
// If we have permissions bound to this leafnode we need to send then back to the
|
||||
// origin server for local enforcement.
|
||||
s.sendPermsInfo(c)
|
||||
|
||||
// Create and initialize the smap since we know our bound account now.
|
||||
// This will send all registered subs too.
|
||||
s.initLeafNodeSmapAndSendSubs(c)
|
||||
@@ -1071,6 +1083,24 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sends back an info block to the soliciting leafnode to let it know about
|
||||
// its permission settings for local enforcement.
|
||||
func (s *Server) sendPermsInfo(c *client) {
|
||||
if c.perms == nil {
|
||||
return
|
||||
}
|
||||
// Copy
|
||||
info := s.copyLeafNodeInfo()
|
||||
c.mu.Lock()
|
||||
info.CID = c.cid
|
||||
info.Import = c.opts.Import
|
||||
info.Export = c.opts.Export
|
||||
b, _ := json.Marshal(info)
|
||||
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
|
||||
c.enqueueProto(bytes.Join(pcs, []byte(" ")))
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Snapshot the current subscriptions from the sublist into our smap which
|
||||
// we will keep updated from now on.
|
||||
// Also send the registered subscriptions.
|
||||
@@ -1277,6 +1307,21 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
|
||||
// Send the subscription interest change to the other side.
|
||||
// Lock should be held.
|
||||
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
|
||||
// If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
|
||||
if c.isSpokeLeafNode() {
|
||||
checkPerms := true
|
||||
if len(key) > 0 && key[0] == '$' || key[0] == '_' {
|
||||
if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
|
||||
strings.HasPrefix(key, oldGWReplyPrefix) ||
|
||||
strings.HasPrefix(key, gwReplyPrefix) {
|
||||
checkPerms = false
|
||||
}
|
||||
}
|
||||
if checkPerms && !c.canSubscribe(key) {
|
||||
return
|
||||
}
|
||||
}
|
||||
// If we are here we can send over to the other side.
|
||||
_b := [64]byte{}
|
||||
b := bytes.NewBuffer(_b[:0])
|
||||
c.writeLeafSub(b, key, n)
|
||||
@@ -1385,7 +1430,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
checkPerms = false
|
||||
}
|
||||
}
|
||||
if checkPerms && !c.canExport(string(sub.subject)) {
|
||||
if checkPerms && c.isHubLeafNode() && !c.canSubscribe(string(sub.subject)) {
|
||||
c.mu.Unlock()
|
||||
c.leafSubPermViolation(sub.subject)
|
||||
return nil
|
||||
@@ -1662,7 +1707,7 @@ func (c *client) processInboundLeafMsg(msg []byte) {
|
||||
c.in.bytes += int32(len(msg) - LEN_CR_LF)
|
||||
|
||||
// Check pub permissions
|
||||
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowed(string(c.pa.subject)) {
|
||||
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && c.isHubLeafNode() && !c.pubAllowed(string(c.pa.subject)) {
|
||||
c.leafPubPermViolation(c.pa.subject)
|
||||
return
|
||||
}
|
||||
@@ -1745,6 +1790,12 @@ func (c *client) leafSubPermViolation(subj []byte) {
|
||||
// Sends the permission violation error to the remote, logs it and closes the connection.
|
||||
// If this is from a server soliciting, the reconnection will be delayed.
|
||||
func (c *client) leafPermViolation(pub bool, subj []byte) {
|
||||
if c.isSpokeLeafNode() {
|
||||
// For spokes these are no-ops since the hub server told us our permissions.
|
||||
// We just need to not send these over to the other side since we will get cutoff.
|
||||
return
|
||||
}
|
||||
// FIXME(dlc) ?
|
||||
c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
|
||||
var action string
|
||||
if pub {
|
||||
|
||||
@@ -89,6 +89,22 @@ func checkSubInterest(t *testing.T, s *server.Server, accName, subject string, t
|
||||
})
|
||||
}
|
||||
|
||||
func checkNoSubInterest(t *testing.T, s *server.Server, accName, subject string, timeout time.Duration) {
|
||||
t.Helper()
|
||||
acc, err := s.LookupAccount(accName)
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up account %q: %v", accName, err)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
for time.Now().Before(start.Add(timeout)) {
|
||||
if acc.SubscriptionInterest(subject) {
|
||||
t.Fatalf("Did not expect interest for %q", subject)
|
||||
}
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func runThreeServers(t *testing.T) (srvA, srvB, srvC *server.Server, optsA, optsB, optsC *server.Options) {
|
||||
srvA, optsA = RunServerWithConfig("./configs/srv_a.conf")
|
||||
srvB, optsB = RunServerWithConfig("./configs/srv_b.conf")
|
||||
|
||||
@@ -1364,6 +1364,73 @@ func TestLeafNodeOperatorModel(t *testing.T) {
|
||||
checkLeafNodeConnected(t, s)
|
||||
}
|
||||
|
||||
func TestLeafNodeUserPermsForConnection(t *testing.T) {
|
||||
s, opts, conf := runLeafNodeOperatorServer(t)
|
||||
defer os.Remove(conf)
|
||||
defer s.Shutdown()
|
||||
|
||||
// Setup account and a user that will be used by the remote leaf node server.
|
||||
// createAccount automatically registers with resolver etc..
|
||||
acc, akp := createAccount(t, s)
|
||||
kp, _ := nkeys.CreateUser()
|
||||
pub, _ := kp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(pub)
|
||||
nuc.Permissions.Pub.Allow.Add("foo.>")
|
||||
nuc.Permissions.Pub.Allow.Add("baz.>")
|
||||
nuc.Permissions.Sub.Allow.Add("foo.>")
|
||||
ujwt, err := nuc.Encode(akp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
seed, _ := kp.Seed()
|
||||
mycreds := genCredsFile(t, ujwt, seed)
|
||||
defer os.Remove(mycreds)
|
||||
|
||||
sl, _, lnconf := runSolicitWithCredentials(t, opts, mycreds)
|
||||
defer os.Remove(lnconf)
|
||||
defer sl.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, s)
|
||||
|
||||
// Create credentials for a normal unrestricted user that we will connect to the op server.
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
// Create a user on the leafnode server that solicited.
|
||||
nc2, err := nats.Connect(sl.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
// Make sure subscriptions properly do or do not make it to the hub.
|
||||
// Note that all hub subscriptions will make it to the leafnode.
|
||||
nc2.SubscribeSync("bar")
|
||||
checkNoSubInterest(t, s, acc.GetName(), "bar", 20*time.Millisecond)
|
||||
// This one should.
|
||||
nc2.SubscribeSync("foo.22")
|
||||
checkSubInterest(t, s, acc.GetName(), "foo.22", 20*time.Millisecond)
|
||||
|
||||
// Capture everything.
|
||||
sub, _ := nc.SubscribeSync(">")
|
||||
nc.Flush()
|
||||
|
||||
// Now check local pubs are not forwarded.
|
||||
nc2.Publish("baz.22", nil)
|
||||
m, err := sub.NextMsg(1 * time.Second)
|
||||
if err != nil || m.Subject != "baz.22" {
|
||||
t.Fatalf("Expected to received this message")
|
||||
}
|
||||
nc2.Publish("bar.22", nil)
|
||||
if _, err := sub.NextMsg(100 * time.Millisecond); err == nil {
|
||||
t.Fatalf("Did not expect to receive this message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeMultipleAccounts(t *testing.T) {
|
||||
// So we will create a main server with two accounts. The remote server, acting as a leaf node, will simply have
|
||||
// the $G global account and no auth. Make sure things work properly here.
|
||||
|
||||
Reference in New Issue
Block a user