Map anonymous reply subjects

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2018-09-21 16:49:27 -07:00
parent f6cb706c68
commit c4bcbf6275
3 changed files with 78 additions and 36 deletions

View File

@@ -14,6 +14,7 @@
package server
import (
"fmt"
"os"
"strings"
"testing"
@@ -69,7 +70,7 @@ func TestAccountIsolation(t *testing.T) {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q\n", l)
t.Fatalf("PONG response incorrect: %q", l)
}
go cfoo.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n"))
@@ -83,7 +84,7 @@ func TestAccountIsolation(t *testing.T) {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crFoo, []byte("hello\r\n"), t)
@@ -93,7 +94,7 @@ func TestAccountIsolation(t *testing.T) {
t.Fatalf("Error for client 'bar' from server: %v", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %q\n", l)
t.Fatalf("PONG response incorrect: %q", l)
}
}
@@ -190,7 +191,7 @@ func TestAccountSimpleConfig(t *testing.T) {
t.Fatalf("Received an error processing config file: %v", err)
}
if la := len(opts.Accounts); la != 2 {
t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la)
t.Fatalf("Expected to see 2 accounts in opts, got %d", la)
}
if !accountNameExists("foo", opts.Accounts) {
t.Fatal("Expected a 'foo' account")
@@ -232,11 +233,11 @@ func TestAccountParseConfig(t *testing.T) {
}
if la := len(opts.Accounts); la != 2 {
t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la)
t.Fatalf("Expected to see 2 accounts in opts, got %d", la)
}
if lu := len(opts.Users); lu != 4 {
t.Fatalf("Expected 4 total Users, got %d\n", lu)
t.Fatalf("Expected 4 total Users, got %d", lu)
}
var natsAcc *Account
@@ -391,10 +392,10 @@ func TestSimpleMapping(t *testing.T) {
}
matches := mraw[0]
if matches[SUB_INDEX] != "import.foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != sid {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
}
@@ -459,10 +460,10 @@ func TestNoPrefixWildcardMapping(t *testing.T) {
}
matches := mraw[0]
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("hello\r\n"), t)
}
@@ -512,10 +513,10 @@ func TestPrefixWildcardMapping(t *testing.T) {
}
matches := mraw[0]
if matches[SUB_INDEX] != "pub.imports.foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("hello\r\n"), t)
}
@@ -565,10 +566,10 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) {
}
matches := mraw[0]
if matches[SUB_INDEX] != "pub.imports.foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("hello\r\n"), t)
}
@@ -630,17 +631,19 @@ func TestCrossAccountRequestReply(t *testing.T) {
}
matches := mraw[0]
if matches[SUB_INDEX] != "test.request" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
if matches[REPLY_INDEX] != "bar" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
// Make sure this looks like _INBOX
if !strings.HasPrefix(matches[REPLY_INDEX], "_INBOX.") {
t.Fatalf("Expected an _INBOX.* like reply, got '%s'", matches[REPLY_INDEX])
}
checkPayload(crFoo, []byte("help\r\n"), t)
go cfoo.parseAndFlush([]byte("PUB bar 2\r\n22\r\n"))
replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX])
go cfoo.parseAndFlush([]byte(replyOp))
// Now read the response from crBar
l, err = crBar.ReadString('\n')
@@ -653,13 +656,13 @@ func TestCrossAccountRequestReply(t *testing.T) {
}
matches = mraw[0]
if matches[SUB_INDEX] != "bar" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "11" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
if matches[REPLY_INDEX] != "" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX])
}
checkPayload(crBar, []byte("22\r\n"), t)
@@ -669,3 +672,13 @@ func TestCrossAccountRequestReply(t *testing.T) {
t.Fatalf("Expected no remaining routes on fooAcc, got %d", nr)
}
}
func BenchmarkNewRouteReply(b *testing.B) {
opts := defaultServerOptions
s := New(&opts)
c, _, _ := newClientForServer(s)
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.newRouteReply()
}
}

View File

@@ -1480,11 +1480,11 @@ func (c *client) processUnsub(arg []byte) error {
return nil
}
func (c *client) msgHeader(mh []byte, sub *subscription) []byte {
func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte {
mh = append(mh, sub.sid...)
mh = append(mh, ' ')
if c.pa.reply != nil {
mh = append(mh, c.pa.reply...)
if reply != nil {
mh = append(mh, reply...)
mh = append(mh, ' ')
}
mh = append(mh, c.pa.szb...)
@@ -1636,6 +1636,32 @@ func (c *client) pubAllowed(subject []byte) bool {
return allowed
}
// Used to mimic client like replies.
const (
replyPrefix = "_INBOX."
replyPrefixLen = len(replyPrefix)
digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
base = 62
)
// newRouteReply is used when rewriting replies that cross account boundaries.
// These will look like _INBOX.XXXXXXXX, similar to the old style of replies for most clients.
func (c *client) newRouteReply() []byte {
// Check to see if we have our own rand yet. Global rand
// has contention with lots of clients, etc.
if c.in.prand == nil {
c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
var b = [15]byte{'_', 'I', 'N', 'B', 'O', 'X', '.'}
rn := c.in.prand.Int63()
for i, l := replyPrefixLen, rn; i < len(b); i++ {
b[i] = digits[l%base]
l /= base
}
return b[:]
}
// processMsg is called to process an inbound msg from a client.
func (c *client) processInboundMsg(msg []byte) {
// Snapshot server.
@@ -1703,15 +1729,18 @@ func (c *client) processInboundMsg(msg []byte) {
c.acc.mu.RUnlock()
// Get the results from the other account for the mapped "to" subject.
if rm != nil && rm.acc != nil && rm.acc.sl != nil {
var nrr []byte
if rm.ae {
c.acc.removeRoute(rm.from)
}
if c.pa.reply != nil {
rm.acc.addImplicitRoute(c.acc, string(c.pa.reply), string(c.pa.reply), true)
// We want to remap this to provide anonymity.
nrr = c.newRouteReply()
rm.acc.addImplicitRoute(c.acc, string(nrr), string(c.pa.reply), true)
}
// FIXME(dlc) - Do L1 cache trick from above.
rr := rm.acc.sl.Match(rm.to)
c.processMsgResults(rr, msg, []byte(rm.to))
c.processMsgResults(rr, msg, []byte(rm.to), nrr)
}
}
@@ -1726,12 +1755,12 @@ func (c *client) processInboundMsg(msg []byte) {
if c.typ == ROUTER {
c.processRoutedMsgResults(r, msg)
} else if c.typ == CLIENT {
c.processMsgResults(r, msg, c.pa.subject)
c.processMsgResults(r, msg, c.pa.subject, c.pa.reply)
}
}
// This processes the sublist results for a given message.
func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) {
func (c *client) processMsgResults(r *SublistResult, msg, subject, reply []byte) {
// msg header
msgh := c.msgb[:msgHeadProtoLen]
msgh = append(msgh, subject...)
@@ -1767,7 +1796,7 @@ func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) {
rmap[sub.client.route.remoteID] = routeSeen
sub.client.mu.Unlock()
}
// Check for mapped subs
// Check for import mapped subs
if sub.im != nil && sub.im.prefix != "" {
// Redo the subject here on the fly.
msgh := c.msgb[:msgHeadProtoLen]
@@ -1777,7 +1806,7 @@ func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) {
si = len(msgh)
}
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
mh := c.msgHeader(msgh[:si], sub, reply)
c.deliverMsg(sub, mh, msg)
}
@@ -1806,7 +1835,7 @@ func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) {
msgh = append(msgh, ' ')
si = len(msgh)
}
mh := c.msgHeader(msgh[:si], sub)
mh := c.msgHeader(msgh[:si], sub, reply)
if c.deliverMsg(sub, mh, msg) {
break
}

View File

@@ -194,7 +194,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) {
rsub = sub
continue
}
mh := c.msgHeader(msgh[:], sub)
mh := c.msgHeader(msgh[:], sub, c.pa.reply)
if c.deliverMsg(sub, mh, msg) {
c.Debugf("Redelivery succeeded for message on group '%q'", group)
return
@@ -203,7 +203,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) {
// If we are here we failed to find a local, see if we snapshotted a
// remote sub, and if so deliver to that.
if rsub != nil {
mh := c.msgHeader(msgh[:], rsub)
mh := c.msgHeader(msgh[:], rsub, c.pa.reply)
if c.deliverMsg(rsub, mh, msg) {
c.Debugf("Re-routing message on group '%q' to remote server", group)
return
@@ -236,7 +236,7 @@ func (c *client) processRoutedMsgResults(r *SublistResult, msg []byte) {
}
didDeliver := false
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
mh := c.msgHeader(msgh[:si], sub, c.pa.reply)
didDeliver = c.deliverMsg(sub, mh, msg)
}
if !didDeliver && c.srv != nil {
@@ -261,7 +261,7 @@ func (c *client) processRoutedMsgResults(r *SublistResult, msg []byte) {
sub.client.mu.Unlock()
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
mh := c.msgHeader(msgh[:si], sub, c.pa.reply)
c.deliverMsg(sub, mh, msg)
}
}