From 025b63300bcf757ca7dc308b3067423d3265538e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 29 Jul 2013 17:09:41 -0700 Subject: [PATCH] 1-hop semantics for messages from routes --- server/client.go | 9 +++++ server/parser.go | 1 + server/parser_test.go | 79 ++++++++++++++++++++++++++++++++++++++++++- test/routes_test.go | 20 ++++++++++- test/test.go | 11 +++++- 5 files changed, 117 insertions(+), 3 deletions(-) diff --git a/server/client.go b/server/client.go index 4da234d3..0ca5c806 100644 --- a/server/client.go +++ b/server/client.go @@ -582,8 +582,17 @@ func (c *client) processMsg(msg []byte) { var qmap map[string][]*subscription var qsubs []*subscription + isRoute := c.typ == ROUTER + for _, v := range r { sub := v.(*subscription) + + // Skip if sourced from a ROUTER and going to another ROUYTER. + // This is one-hop semantics for ROUTERs. + if isRoute && sub.client.typ == ROUTER { + continue + } + if sub.queue != nil { // FIXME(dlc), this can be more efficient if qmap == nil { diff --git a/server/parser.go b/server/parser.go index cec6ca4f..32524de9 100644 --- a/server/parser.go +++ b/server/parser.go @@ -9,6 +9,7 @@ import ( type pubArg struct { subject []byte reply []byte + sid []byte szb []byte size int } diff --git a/server/parser_test.go b/server/parser_test.go index b52bf9da..abfdda97 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -1,5 +1,5 @@ -// Copyright 2012 Apcera Inc. All rights reserved. +// Copyright 2012-2013 Apcera Inc. All rights reserved. package server @@ -223,6 +223,83 @@ func TestParsePubArg(t *testing.T) { testPubArg(c, t) } +func TestParseMsg(t *testing.T) { + c := dummyClient() + + pub := []byte("MSG foo RSID:1:2 5\r\nhello\r") + err := c.parse(pub) + if err != nil || c.state != MSG_END { + t.Fatalf("Unexpected: %d : %v\n", c.state, err) + } + if !bytes.Equal(c.pa.subject, []byte("foo")) { + t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject) + } + if c.pa.reply != nil { + t.Fatalf("Did not parse reply correctly: 'nil' vs '%s'\n", c.pa.reply) + } + if c.pa.size != 5 { + t.Fatalf("Did not parse msg size correctly: 5 vs %d\n", c.pa.size) + } + if !bytes.Equal(c.pa.sid, []byte("RSID:1:2")) { + t.Fatalf("Did not parse sid correctly: 'RSID:1:2' vs '%s'\n", c.pa.sid) + } + + c.state = OP_START + + pub = []byte("MSG foo.bar RSID:1:2 INBOX.22 11\r\nhello world\r") + err = c.parse(pub) + if err != nil || c.state != MSG_END { + t.Fatalf("Unexpected: %d : %v\n", c.state, err) + } + if !bytes.Equal(c.pa.subject, []byte("foo.bar")) { + t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject) + } + if !bytes.Equal(c.pa.reply, []byte("INBOX.22")) { + t.Fatalf("Did not parse reply correctly: 'INBOX.22' vs '%s'\n", c.pa.reply) + } + if c.pa.size != 11 { + t.Fatalf("Did not parse msg size correctly: 11 vs %d\n", c.pa.size) + } +} + +func testMsgArg(c *client, t *testing.T) { + if !bytes.Equal(c.pa.subject, []byte("foobar")) { + t.Fatalf("Mismatched subject: '%s'\n", c.pa.subject) + } + if !bytes.Equal(c.pa.szb, []byte("22")) { + t.Fatalf("Bad size buf: '%s'\n", c.pa.szb) + } + if c.pa.size != 22 { + t.Fatalf("Bad size: %d\n", c.pa.size) + } + if !bytes.Equal(c.pa.sid, []byte("RSID:22:1")) { + t.Fatalf("Bad sid: '%s'\n", c.pa.sid) + } +} + +func TestParseMsgArg(t *testing.T) { + c := dummyClient() + if err := c.processMsgArgs([]byte("foobar RSID:22:1 22")) ; err != nil { + t.Fatalf("Unexpected parse error: %v\n", err) + } + testMsgArg(c, t) + if err := c.processMsgArgs([]byte(" foobar RSID:22:1 22")) ; err != nil { + t.Fatalf("Unexpected parse error: %v\n", err) + } + testMsgArg(c, t) + if err := c.processMsgArgs([]byte(" foobar RSID:22:1 22 ")) ; err != nil { + t.Fatalf("Unexpected parse error: %v\n", err) + } + testMsgArg(c, t) + if err := c.processMsgArgs([]byte("foobar RSID:22:1 \t22")) ; err != nil { + t.Fatalf("Unexpected parse error: %v\n", err) + } + if err := c.processMsgArgs([]byte("foobar\t\tRSID:22:1\t22\r")) ; err != nil { + t.Fatalf("Unexpected parse error: %v\n", err) + } + testMsgArg(c, t) +} + func TestShouldFail(t *testing.T) { c := dummyClient() diff --git a/test/routes_test.go b/test/routes_test.go index 317e3cff..c16908ec 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -182,9 +182,27 @@ func TestRouteForwardsMsgToClients(t *testing.T) { clientSend("PING\r\n") clientExpect(pongRe) - // Send MSG via route connection + // Send MSG proto via route connection routeSend("MSG foo 1 2\r\nok\r\n") matches := expectMsgs(1) checkMsg(t, matches[0], "foo", "1", "", "2", "ok") } + +func TestRouteOneHopSemantics(t *testing.T) { + s, opts := runRouteServer(t) + defer s.Shutdown() + + route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + expectAuthRequired(t, route) + routeSend, _ := setupRoute(t, route, opts) + + // Express interest on this route for foo. + routeSend("SUB foo RSID:2:2\r\n") + + // Send MSG proto via route connection + routeSend("MSG foo 1 2\r\nok\r\n") + + // Make sure it does not come back! + expectNothing(t, route) +} \ No newline at end of file diff --git a/test/test.go b/test/test.go index 31932807..4c0065e6 100644 --- a/test/test.go +++ b/test/test.go @@ -271,7 +271,6 @@ var expBuf = make([]byte, 32768) // Test result from server against regexp func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte { // Wait for commands to be processed and results queued for read - // time.Sleep(10 * time.Millisecond) c.SetReadDeadline(time.Now().Add(1 * time.Second)) defer c.SetReadDeadline(time.Time{}) @@ -287,6 +286,16 @@ func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte { return buf } +func expectNothing(t tLogger, c net.Conn) { + c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + defer c.SetReadDeadline(time.Time{}) + + n, err := c.Read(expBuf) + if err == nil && n > 0 { + stackFatalf(t, "Expected nothing, received: '%q'\n", expBuf[:n]) + } +} + // This will check that we got what we expected. func checkMsg(t tLogger, m [][]byte, subject, sid, reply, len, msg string) { if string(m[SUB_INDEX]) != subject {