From ca4f03c1a6289941b5685566ceb200006c2170a7 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 16 Jun 2020 08:33:09 -0700 Subject: [PATCH] Properly handle leafnode spoke permissions. When a leafnode would connect with credentials that had permissions the spoke did not have a way of knowing what those were. This could lead to being disconnected when sending subscriptions or messages to the hub which were not allowed. Signed-off-by: Derek Collison --- server/auth.go | 2 +- server/client.go | 21 ++++++++++++-- server/leafnode.go | 55 +++++++++++++++++++++++++++++++++-- test/cluster_test.go | 16 +++++++++++ test/leafnode_test.go | 67 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 156 insertions(+), 5 deletions(-) diff --git a/server/auth.go b/server/auth.go index 9a568453..ad9de95b 100644 --- a/server/auth.go +++ b/server/auth.go @@ -808,7 +808,7 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool { return false } - // We are here if we accept leafnode connections without any credential. + // We are here if we accept leafnode connections without any credentials. // Still, if the CONNECT has some user info, we will bind to the // user's account or to the specified default account (if provided) diff --git a/server/client.go b/server/client.go index 6d1728a1..aa6ddda2 100644 --- a/server/client.go +++ b/server/client.go @@ -440,7 +440,7 @@ type clientOpts struct { Headers bool `json:"headers,omitempty"` NoResponders bool `json:"no_responders,omitempty"` - // Routes only + // Routes and Leafnodes only Import *SubjectPermission `json:"import,omitempty"` Export *SubjectPermission `json:"export,omitempty"` } @@ -748,6 +748,15 @@ func (c *client) setPermissions(perms *Permissions) { c.perms.sub.deny.Insert(sub) } } + + // If we are a leafnode and we are the hub copy the extracted perms + // to resend back to soliciting server. These are reversed from the + // way routes interpret them since this is how the soliciting server + // will receive these back in an update INFO. + if c.isHubLeafNode() { + c.opts.Import = perms.Subscribe + c.opts.Export = perms.Publish + } } // Check to see if we have an expiration for the user JWT via base claims. @@ -2698,6 +2707,14 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply b return false } + // Check if we are a spoke leafnode and have perms to check. + if client.isSpokeLeafNode() && client.perms != nil { + if !client.pubAllowed(string(subject)) { + client.mu.Unlock() + return false + } + } + srv := client.srv sub.nm++ @@ -3803,7 +3820,7 @@ func (c *client) typeString() string { case GATEWAY: return "Gateway" case LEAF: - return "LeafNode" + return "Leafnode" case JETSTREAM: return "JetStream" case ACCOUNT: diff --git a/server/leafnode.go b/server/leafnode.go index 8ccbeacf..e9f4872b 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -907,6 +907,14 @@ func (c *client) processLeafnodeInfo(info *Info) error { c.updateLeafNodeURLs(info) } + // Check to see if we have permissions updates here. + if info.Import != nil || info.Export != nil { + c.setPermissions(&Permissions{ + Publish: info.Export, + Subscribe: info.Import, + }) + } + return nil } @@ -1057,6 +1065,10 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro c.leaf.isSpoke = true } + // If we have permissions bound to this leafnode we need to send then back to the + // origin server for local enforcement. + s.sendPermsInfo(c) + // Create and initialize the smap since we know our bound account now. // This will send all registered subs too. s.initLeafNodeSmapAndSendSubs(c) @@ -1071,6 +1083,24 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return nil } +// Sends back an info block to the soliciting leafnode to let it know about +// its permission settings for local enforcement. +func (s *Server) sendPermsInfo(c *client) { + if c.perms == nil { + return + } + // Copy + info := s.copyLeafNodeInfo() + c.mu.Lock() + info.CID = c.cid + info.Import = c.opts.Import + info.Export = c.opts.Export + b, _ := json.Marshal(info) + pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} + c.enqueueProto(bytes.Join(pcs, []byte(" "))) + c.mu.Unlock() +} + // Snapshot the current subscriptions from the sublist into our smap which // we will keep updated from now on. // Also send the registered subscriptions. @@ -1277,6 +1307,21 @@ func (c *client) updateSmap(sub *subscription, delta int32) { // Send the subscription interest change to the other side. // Lock should be held. func (c *client) sendLeafNodeSubUpdate(key string, n int32) { + // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub. + if c.isSpokeLeafNode() { + checkPerms := true + if len(key) > 0 && key[0] == '$' || key[0] == '_' { + if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) || + strings.HasPrefix(key, oldGWReplyPrefix) || + strings.HasPrefix(key, gwReplyPrefix) { + checkPerms = false + } + } + if checkPerms && !c.canSubscribe(key) { + return + } + } + // If we are here we can send over to the other side. _b := [64]byte{} b := bytes.NewBuffer(_b[:0]) c.writeLeafSub(b, key, n) @@ -1385,7 +1430,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { checkPerms = false } } - if checkPerms && !c.canExport(string(sub.subject)) { + if checkPerms && c.isHubLeafNode() && !c.canSubscribe(string(sub.subject)) { c.mu.Unlock() c.leafSubPermViolation(sub.subject) return nil @@ -1662,7 +1707,7 @@ func (c *client) processInboundLeafMsg(msg []byte) { c.in.bytes += int32(len(msg) - LEN_CR_LF) // Check pub permissions - if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowed(string(c.pa.subject)) { + if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && c.isHubLeafNode() && !c.pubAllowed(string(c.pa.subject)) { c.leafPubPermViolation(c.pa.subject) return } @@ -1745,6 +1790,12 @@ func (c *client) leafSubPermViolation(subj []byte) { // Sends the permission violation error to the remote, logs it and closes the connection. // If this is from a server soliciting, the reconnection will be delayed. func (c *client) leafPermViolation(pub bool, subj []byte) { + if c.isSpokeLeafNode() { + // For spokes these are no-ops since the hub server told us our permissions. + // We just need to not send these over to the other side since we will get cutoff. + return + } + // FIXME(dlc) ? c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation) var action string if pub { diff --git a/test/cluster_test.go b/test/cluster_test.go index 1a0dd7b8..abc43bf4 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -89,6 +89,22 @@ func checkSubInterest(t *testing.T, s *server.Server, accName, subject string, t }) } +func checkNoSubInterest(t *testing.T, s *server.Server, accName, subject string, timeout time.Duration) { + t.Helper() + acc, err := s.LookupAccount(accName) + if err != nil { + t.Fatalf("error looking up account %q: %v", accName, err) + } + + start := time.Now() + for time.Now().Before(start.Add(timeout)) { + if acc.SubscriptionInterest(subject) { + t.Fatalf("Did not expect interest for %q", subject) + } + time.Sleep(5 * time.Millisecond) + } +} + func runThreeServers(t *testing.T) (srvA, srvB, srvC *server.Server, optsA, optsB, optsC *server.Options) { srvA, optsA = RunServerWithConfig("./configs/srv_a.conf") srvB, optsB = RunServerWithConfig("./configs/srv_b.conf") diff --git a/test/leafnode_test.go b/test/leafnode_test.go index b19fb010..5a340248 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -1364,6 +1364,73 @@ func TestLeafNodeOperatorModel(t *testing.T) { checkLeafNodeConnected(t, s) } +func TestLeafNodeUserPermsForConnection(t *testing.T) { + s, opts, conf := runLeafNodeOperatorServer(t) + defer os.Remove(conf) + defer s.Shutdown() + + // Setup account and a user that will be used by the remote leaf node server. + // createAccount automatically registers with resolver etc.. + acc, akp := createAccount(t, s) + kp, _ := nkeys.CreateUser() + pub, _ := kp.PublicKey() + nuc := jwt.NewUserClaims(pub) + nuc.Permissions.Pub.Allow.Add("foo.>") + nuc.Permissions.Pub.Allow.Add("baz.>") + nuc.Permissions.Sub.Allow.Add("foo.>") + ujwt, err := nuc.Encode(akp) + if err != nil { + t.Fatalf("Error generating user JWT: %v", err) + } + seed, _ := kp.Seed() + mycreds := genCredsFile(t, ujwt, seed) + defer os.Remove(mycreds) + + sl, _, lnconf := runSolicitWithCredentials(t, opts, mycreds) + defer os.Remove(lnconf) + defer sl.Shutdown() + + checkLeafNodeConnected(t, s) + + // Create credentials for a normal unrestricted user that we will connect to the op server. + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + nc, err := nats.Connect(url, createUserCreds(t, s, akp)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + // Create a user on the leafnode server that solicited. + nc2, err := nats.Connect(sl.ClientURL()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + // Make sure subscriptions properly do or do not make it to the hub. + // Note that all hub subscriptions will make it to the leafnode. + nc2.SubscribeSync("bar") + checkNoSubInterest(t, s, acc.GetName(), "bar", 20*time.Millisecond) + // This one should. + nc2.SubscribeSync("foo.22") + checkSubInterest(t, s, acc.GetName(), "foo.22", 20*time.Millisecond) + + // Capture everything. + sub, _ := nc.SubscribeSync(">") + nc.Flush() + + // Now check local pubs are not forwarded. + nc2.Publish("baz.22", nil) + m, err := sub.NextMsg(1 * time.Second) + if err != nil || m.Subject != "baz.22" { + t.Fatalf("Expected to received this message") + } + nc2.Publish("bar.22", nil) + if _, err := sub.NextMsg(100 * time.Millisecond); err == nil { + t.Fatalf("Did not expect to receive this message") + } +} + func TestLeafNodeMultipleAccounts(t *testing.T) { // So we will create a main server with two accounts. The remote server, acting as a leaf node, will simply have // the $G global account and no auth. Make sure things work properly here.