From 6e3401eb8148e4a152e36ae613ced962a75fb828 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 27 Aug 2020 14:33:53 -0600 Subject: [PATCH] [FIXED] Allow response permissions to work across accounts This is a port of https://github.com/nats-io/nats-server/pull/1487 Signed-off-by: Ivan Kozlovic --- server/client.go | 12 ++++---- server/gateway.go | 2 +- test/services_test.go | 72 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 7 deletions(-) create mode 100644 test/services_test.go diff --git a/server/client.go b/server/client.go index 7d7a1ad3..e19c16c8 100644 --- a/server/client.go +++ b/server/client.go @@ -2408,7 +2408,7 @@ var needFlush = struct{}{} // deliverMsg will deliver a message to a matching subscription and its underlying client. // We process all connection/client types. mh is the part that will be protocol/client specific. -func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply bool) bool { +func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, gwrply bool) bool { if sub.client == nil { return false } @@ -2548,8 +2548,8 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply b // If we are tracking dynamic publish permissions that track reply subjects, // do that accounting here. We only look at client.replies which will be non-nil. - if client.replies != nil && len(c.pa.reply) > 0 { - client.replies[string(c.pa.reply)] = &resp{time.Now(), 0} + if client.replies != nil && len(reply) > 0 { + client.replies[string(reply)] = &resp{time.Now(), 0} if len(client.replies) > replyPermLimit { client.pruneReplyPerms() } @@ -3095,7 +3095,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, } // Normal delivery mh := c.msgHeader(msgh[:si], sub, creply) - c.deliverMsg(sub, subject, mh, msg, rplyHasGWPrefix) + c.deliverMsg(sub, subject, creply, mh, msg, rplyHasGWPrefix) } // Set these up to optionally filter based on the queue lists. @@ -3207,7 +3207,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, // "rreply" will be stripped of the $GNR prefix (if present) // for client connections only. mh := c.msgHeader(msgh[:si], sub, rreply) - if c.deliverMsg(sub, subject, mh, msg, rplyHasGWPrefix) { + if c.deliverMsg(sub, subject, rreply, mh, msg, rplyHasGWPrefix) { // Clear rsub rsub = nil if flags&pmrCollectQueueNames != 0 { @@ -3271,7 +3271,7 @@ sendToRoutesOrLeafs: } mh = append(mh, c.pa.szb...) mh = append(mh, _CRLF_...) - c.deliverMsg(rt.sub, subject, mh, msg, false) + c.deliverMsg(rt.sub, subject, reply, mh, msg, false) } return queues } diff --git a/server/gateway.go b/server/gateway.go index 7e7636b4..301e0d2f 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2480,7 +2480,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr sub.nm, sub.max = 0, 0 sub.client = gwc sub.subject = subject - c.deliverMsg(sub, subject, mh, msg, false) + c.deliverMsg(sub, subject, mreply, mh, msg, false) } // Done with subscription, put back to pool. We don't need // to reset content since we explicitly set when using it. diff --git a/test/services_test.go b/test/services_test.go new file mode 100644 index 00000000..636c2a94 --- /dev/null +++ b/test/services_test.go @@ -0,0 +1,72 @@ +// Copyright 2020 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package test + +import ( + "bytes" + "fmt" + "os" + "testing" + "time" + + "github.com/nats-io/nats.go" +) + +func TestServiceAllowResponsesPerms(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + accounts: { + A: { + users: [ {user: a, password: pwd, permissions = {subscribe=foo, allow_responses=true}} ] + exports: [ {service: "foo"} ] + }, + B: { + users: [{user: b, password: pwd} ] + imports: [ {service: { account: A, subject: "foo"}} ] + } + } + `)) + defer os.Remove(conf) + + srv, opts := RunServerWithConfig(conf) + defer srv.Shutdown() + + // Responder. + nc, err := nats.Connect(fmt.Sprintf("nats://a:pwd@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + reply := []byte("Hello") + // Respond with 5ms gaps for total response time for all chunks and EOF > 50ms. + nc.Subscribe("foo", func(msg *nats.Msg) { + msg.Respond(reply) + }) + nc.Flush() + + // Now setup requester. + nc2, err := nats.Connect(fmt.Sprintf("nats://b:pwd@%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + resp, err := nc2.Request("foo", []byte("help"), time.Second) + if err != nil { + t.Fatalf("Error expecting response %v", err) + } + if !bytes.Equal(resp.Data, reply) { + t.Fatalf("Did not get correct response, %q vs %q", resp.Data, reply) + } +}