mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
[FIXED] Allow response permissions to work across accounts
This is a port of https://github.com/nats-io/nats-server/pull/1487 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -2408,7 +2408,7 @@ var needFlush = struct{}{}
|
||||
|
||||
// deliverMsg will deliver a message to a matching subscription and its underlying client.
|
||||
// We process all connection/client types. mh is the part that will be protocol/client specific.
|
||||
func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply bool) bool {
|
||||
func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, gwrply bool) bool {
|
||||
if sub.client == nil {
|
||||
return false
|
||||
}
|
||||
@@ -2548,8 +2548,8 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply b
|
||||
|
||||
// If we are tracking dynamic publish permissions that track reply subjects,
|
||||
// do that accounting here. We only look at client.replies which will be non-nil.
|
||||
if client.replies != nil && len(c.pa.reply) > 0 {
|
||||
client.replies[string(c.pa.reply)] = &resp{time.Now(), 0}
|
||||
if client.replies != nil && len(reply) > 0 {
|
||||
client.replies[string(reply)] = &resp{time.Now(), 0}
|
||||
if len(client.replies) > replyPermLimit {
|
||||
client.pruneReplyPerms()
|
||||
}
|
||||
@@ -3095,7 +3095,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
}
|
||||
// Normal delivery
|
||||
mh := c.msgHeader(msgh[:si], sub, creply)
|
||||
c.deliverMsg(sub, subject, mh, msg, rplyHasGWPrefix)
|
||||
c.deliverMsg(sub, subject, creply, mh, msg, rplyHasGWPrefix)
|
||||
}
|
||||
|
||||
// Set these up to optionally filter based on the queue lists.
|
||||
@@ -3207,7 +3207,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
|
||||
// "rreply" will be stripped of the $GNR prefix (if present)
|
||||
// for client connections only.
|
||||
mh := c.msgHeader(msgh[:si], sub, rreply)
|
||||
if c.deliverMsg(sub, subject, mh, msg, rplyHasGWPrefix) {
|
||||
if c.deliverMsg(sub, subject, rreply, mh, msg, rplyHasGWPrefix) {
|
||||
// Clear rsub
|
||||
rsub = nil
|
||||
if flags&pmrCollectQueueNames != 0 {
|
||||
@@ -3271,7 +3271,7 @@ sendToRoutesOrLeafs:
|
||||
}
|
||||
mh = append(mh, c.pa.szb...)
|
||||
mh = append(mh, _CRLF_...)
|
||||
c.deliverMsg(rt.sub, subject, mh, msg, false)
|
||||
c.deliverMsg(rt.sub, subject, reply, mh, msg, false)
|
||||
}
|
||||
return queues
|
||||
}
|
||||
|
||||
@@ -2480,7 +2480,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
|
||||
sub.nm, sub.max = 0, 0
|
||||
sub.client = gwc
|
||||
sub.subject = subject
|
||||
c.deliverMsg(sub, subject, mh, msg, false)
|
||||
c.deliverMsg(sub, subject, mreply, mh, msg, false)
|
||||
}
|
||||
// Done with subscription, put back to pool. We don't need
|
||||
// to reset content since we explicitly set when using it.
|
||||
|
||||
72
test/services_test.go
Normal file
72
test/services_test.go
Normal file
@@ -0,0 +1,72 @@
|
||||
// Copyright 2020 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func TestServiceAllowResponsesPerms(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
listen: 127.0.0.1:-1
|
||||
accounts: {
|
||||
A: {
|
||||
users: [ {user: a, password: pwd, permissions = {subscribe=foo, allow_responses=true}} ]
|
||||
exports: [ {service: "foo"} ]
|
||||
},
|
||||
B: {
|
||||
users: [{user: b, password: pwd} ]
|
||||
imports: [ {service: { account: A, subject: "foo"}} ]
|
||||
}
|
||||
}
|
||||
`))
|
||||
defer os.Remove(conf)
|
||||
|
||||
srv, opts := RunServerWithConfig(conf)
|
||||
defer srv.Shutdown()
|
||||
|
||||
// Responder.
|
||||
nc, err := nats.Connect(fmt.Sprintf("nats://a:pwd@%s:%d", opts.Host, opts.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
reply := []byte("Hello")
|
||||
// Respond with 5ms gaps for total response time for all chunks and EOF > 50ms.
|
||||
nc.Subscribe("foo", func(msg *nats.Msg) {
|
||||
msg.Respond(reply)
|
||||
})
|
||||
nc.Flush()
|
||||
|
||||
// Now setup requester.
|
||||
nc2, err := nats.Connect(fmt.Sprintf("nats://b:pwd@%s:%d", opts.Host, opts.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc2.Close()
|
||||
|
||||
resp, err := nc2.Request("foo", []byte("help"), time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error expecting response %v", err)
|
||||
}
|
||||
if !bytes.Equal(resp.Data, reply) {
|
||||
t.Fatalf("Did not get correct response, %q vs %q", resp.Data, reply)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user