diff --git a/server/accounts_test.go b/server/accounts_test.go index b4b2bcf9..8b44549a 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -14,6 +14,7 @@ package server import ( + "fmt" "os" "strings" "testing" @@ -69,7 +70,7 @@ func TestAccountIsolation(t *testing.T) { t.Fatalf("Error for client 'bar' from server: %v", err) } if !strings.HasPrefix(l, "PONG\r\n") { - t.Fatalf("PONG response incorrect: %q\n", l) + t.Fatalf("PONG response incorrect: %q", l) } go cfoo.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")) @@ -83,7 +84,7 @@ func TestAccountIsolation(t *testing.T) { t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } checkPayload(crFoo, []byte("hello\r\n"), t) @@ -93,7 +94,7 @@ func TestAccountIsolation(t *testing.T) { t.Fatalf("Error for client 'bar' from server: %v", err) } if !strings.HasPrefix(l, "PONG\r\n") { - t.Fatalf("PONG response incorrect: %q\n", l) + t.Fatalf("PONG response incorrect: %q", l) } } @@ -190,7 +191,7 @@ func TestAccountSimpleConfig(t *testing.T) { t.Fatalf("Received an error processing config file: %v", err) } if la := len(opts.Accounts); la != 2 { - t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la) + t.Fatalf("Expected to see 2 accounts in opts, got %d", la) } if !accountNameExists("foo", opts.Accounts) { t.Fatal("Expected a 'foo' account") @@ -232,11 +233,11 @@ func TestAccountParseConfig(t *testing.T) { } if la := len(opts.Accounts); la != 2 { - t.Fatalf("Expected to see 2 accounts in opts, got %d\n", la) + t.Fatalf("Expected to see 2 accounts in opts, got %d", la) } if lu := len(opts.Users); lu != 4 { - t.Fatalf("Expected 4 total Users, got %d\n", lu) + t.Fatalf("Expected 4 total Users, got %d", lu) } var natsAcc *Account @@ -391,10 +392,10 @@ func TestSimpleMapping(t *testing.T) { } matches := mraw[0] if matches[SUB_INDEX] != "import.foo" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != sid { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } } @@ -459,10 +460,10 @@ func TestNoPrefixWildcardMapping(t *testing.T) { } matches := mraw[0] if matches[SUB_INDEX] != "foo" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } checkPayload(crBar, []byte("hello\r\n"), t) } @@ -512,10 +513,10 @@ func TestPrefixWildcardMapping(t *testing.T) { } matches := mraw[0] if matches[SUB_INDEX] != "pub.imports.foo" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } checkPayload(crBar, []byte("hello\r\n"), t) } @@ -565,10 +566,10 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) { } matches := mraw[0] if matches[SUB_INDEX] != "pub.imports.foo" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } checkPayload(crBar, []byte("hello\r\n"), t) } @@ -630,17 +631,19 @@ func TestCrossAccountRequestReply(t *testing.T) { } matches := mraw[0] if matches[SUB_INDEX] != "test.request" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } - if matches[REPLY_INDEX] != "bar" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + // Make sure this looks like _INBOX + if !strings.HasPrefix(matches[REPLY_INDEX], "_INBOX.") { + t.Fatalf("Expected an _INBOX.* like reply, got '%s'", matches[REPLY_INDEX]) } checkPayload(crFoo, []byte("help\r\n"), t) - go cfoo.parseAndFlush([]byte("PUB bar 2\r\n22\r\n")) + replyOp := fmt.Sprintf("PUB %s 2\r\n22\r\n", matches[REPLY_INDEX]) + go cfoo.parseAndFlush([]byte(replyOp)) // Now read the response from crBar l, err = crBar.ReadString('\n') @@ -653,13 +656,13 @@ func TestCrossAccountRequestReply(t *testing.T) { } matches = mraw[0] if matches[SUB_INDEX] != "bar" { - t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) + t.Fatalf("Did not get correct subject: '%s'", matches[SUB_INDEX]) } if matches[SID_INDEX] != "11" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } if matches[REPLY_INDEX] != "" { - t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) + t.Fatalf("Did not get correct sid: '%s'", matches[SID_INDEX]) } checkPayload(crBar, []byte("22\r\n"), t) @@ -669,3 +672,13 @@ func TestCrossAccountRequestReply(t *testing.T) { t.Fatalf("Expected no remaining routes on fooAcc, got %d", nr) } } + +func BenchmarkNewRouteReply(b *testing.B) { + opts := defaultServerOptions + s := New(&opts) + c, _, _ := newClientForServer(s) + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.newRouteReply() + } +} diff --git a/server/client.go b/server/client.go index 30f86d79..1589ecad 100644 --- a/server/client.go +++ b/server/client.go @@ -1480,11 +1480,11 @@ func (c *client) processUnsub(arg []byte) error { return nil } -func (c *client) msgHeader(mh []byte, sub *subscription) []byte { +func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte { mh = append(mh, sub.sid...) mh = append(mh, ' ') - if c.pa.reply != nil { - mh = append(mh, c.pa.reply...) + if reply != nil { + mh = append(mh, reply...) mh = append(mh, ' ') } mh = append(mh, c.pa.szb...) @@ -1636,6 +1636,32 @@ func (c *client) pubAllowed(subject []byte) bool { return allowed } +// Used to mimic client like replies. +const ( + replyPrefix = "_INBOX." + replyPrefixLen = len(replyPrefix) + digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + base = 62 +) + +// newRouteReply is used when rewriting replies that cross account boundaries. +// These will look like _INBOX.XXXXXXXX, similar to the old style of replies for most clients. +func (c *client) newRouteReply() []byte { + // Check to see if we have our own rand yet. Global rand + // has contention with lots of clients, etc. + if c.in.prand == nil { + c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano())) + } + + var b = [15]byte{'_', 'I', 'N', 'B', 'O', 'X', '.'} + rn := c.in.prand.Int63() + for i, l := replyPrefixLen, rn; i < len(b); i++ { + b[i] = digits[l%base] + l /= base + } + return b[:] +} + // processMsg is called to process an inbound msg from a client. func (c *client) processInboundMsg(msg []byte) { // Snapshot server. @@ -1703,15 +1729,18 @@ func (c *client) processInboundMsg(msg []byte) { c.acc.mu.RUnlock() // Get the results from the other account for the mapped "to" subject. if rm != nil && rm.acc != nil && rm.acc.sl != nil { + var nrr []byte if rm.ae { c.acc.removeRoute(rm.from) } if c.pa.reply != nil { - rm.acc.addImplicitRoute(c.acc, string(c.pa.reply), string(c.pa.reply), true) + // We want to remap this to provide anonymity. + nrr = c.newRouteReply() + rm.acc.addImplicitRoute(c.acc, string(nrr), string(c.pa.reply), true) } // FIXME(dlc) - Do L1 cache trick from above. rr := rm.acc.sl.Match(rm.to) - c.processMsgResults(rr, msg, []byte(rm.to)) + c.processMsgResults(rr, msg, []byte(rm.to), nrr) } } @@ -1726,12 +1755,12 @@ func (c *client) processInboundMsg(msg []byte) { if c.typ == ROUTER { c.processRoutedMsgResults(r, msg) } else if c.typ == CLIENT { - c.processMsgResults(r, msg, c.pa.subject) + c.processMsgResults(r, msg, c.pa.subject, c.pa.reply) } } // This processes the sublist results for a given message. -func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) { +func (c *client) processMsgResults(r *SublistResult, msg, subject, reply []byte) { // msg header msgh := c.msgb[:msgHeadProtoLen] msgh = append(msgh, subject...) @@ -1767,7 +1796,7 @@ func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) { rmap[sub.client.route.remoteID] = routeSeen sub.client.mu.Unlock() } - // Check for mapped subs + // Check for import mapped subs if sub.im != nil && sub.im.prefix != "" { // Redo the subject here on the fly. msgh := c.msgb[:msgHeadProtoLen] @@ -1777,7 +1806,7 @@ func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) { si = len(msgh) } // Normal delivery - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, reply) c.deliverMsg(sub, mh, msg) } @@ -1806,7 +1835,7 @@ func (c *client) processMsgResults(r *SublistResult, msg, subject []byte) { msgh = append(msgh, ' ') si = len(msgh) } - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, reply) if c.deliverMsg(sub, mh, msg) { break } diff --git a/server/route.go b/server/route.go index 37c32bca..bbfa4bf2 100644 --- a/server/route.go +++ b/server/route.go @@ -194,7 +194,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) { rsub = sub continue } - mh := c.msgHeader(msgh[:], sub) + mh := c.msgHeader(msgh[:], sub, c.pa.reply) if c.deliverMsg(sub, mh, msg) { c.Debugf("Redelivery succeeded for message on group '%q'", group) return @@ -203,7 +203,7 @@ func (c *client) reRouteQMsg(r *SublistResult, msgh, msg, group []byte) { // If we are here we failed to find a local, see if we snapshotted a // remote sub, and if so deliver to that. if rsub != nil { - mh := c.msgHeader(msgh[:], rsub) + mh := c.msgHeader(msgh[:], rsub, c.pa.reply) if c.deliverMsg(rsub, mh, msg) { c.Debugf("Re-routing message on group '%q' to remote server", group) return @@ -236,7 +236,7 @@ func (c *client) processRoutedMsgResults(r *SublistResult, msg []byte) { } didDeliver := false if sub != nil { - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, c.pa.reply) didDeliver = c.deliverMsg(sub, mh, msg) } if !didDeliver && c.srv != nil { @@ -261,7 +261,7 @@ func (c *client) processRoutedMsgResults(r *SublistResult, msg []byte) { sub.client.mu.Unlock() // Normal delivery - mh := c.msgHeader(msgh[:si], sub) + mh := c.msgHeader(msgh[:si], sub, c.pa.reply) c.deliverMsg(sub, mh, msg) } }