From 019c105ca70a897b68a0321ae34bd1bc1b6514f5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 14 May 2020 18:07:51 -0700 Subject: [PATCH] Updates based on feedback, more tests, few bug fixes Signed-off-by: Derek Collison --- server/client.go | 2 +- server/leafnode.go | 3 +- server/parser.go | 13 +++-- server/parser_test.go | 117 ++++++++++++++++++++++++++++++++++++++++++ server/route.go | 10 +++- 5 files changed, 136 insertions(+), 9 deletions(-) diff --git a/server/client.go b/server/client.go index 6bd54a84..1d54faf9 100644 --- a/server/client.go +++ b/server/client.go @@ -2654,7 +2654,7 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply b // Check here if we have a header with our message. If this client can not // support we need to strip the headers from the payload. - // The actual header would have been processed correctluy for us, so just + // The actual header would have been processed correctly for us, so just // need to update payload. if c.pa.hdr > 0 && !sub.client.headers { msg = msg[c.pa.hdr:] diff --git a/server/leafnode.go b/server/leafnode.go index ad7d3964..ae4ef063 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1443,8 +1443,7 @@ func (c *client) processLeafUnsub(arg []byte) error { } func (c *client) processLeafHeaderMsgArgs(arg []byte) error { - fmt.Printf("arg is %q\n", arg) - return nil + return fmt.Errorf("headers not implemented for leafnodes yet") } func (c *client) processLeafMsgArgs(arg []byte) error { diff --git a/server/parser.go b/server/parser.go index 09e5cd56..b17b5aaf 100644 --- a/server/parser.go +++ b/server/parser.go @@ -287,6 +287,7 @@ func (c *client) parse(buf []byte) error { case ' ', '\t': continue default: + c.pa.hdr = 0 c.state = HMSG_ARG c.as = i } @@ -876,6 +877,7 @@ func (c *client) parse(buf []byte) error { case ' ', '\t': continue default: + c.pa.hdr = -1 c.state = MSG_ARG c.as = i } @@ -1050,8 +1052,8 @@ func (c *client) parse(buf []byte) error { if c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG || c.state == HPUB_ARG || c.state == ASUB_ARG || c.state == AUSUB_ARG || - c.state == MSG_ARG || c.state == MINUS_ERR_ARG || - c.state == CONNECT_ARG || c.state == INFO_ARG { + c.state == MSG_ARG || c.state == HMSG_ARG || + c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG || c.state == INFO_ARG { // Setup a holder buffer to deal with split buffer scenario. if c.argBuf == nil { c.argBuf = c.scratch[:0] @@ -1084,7 +1086,6 @@ func (c *client) parse(buf []byte) error { // new buffer to hold the split message. if c.pa.size > cap(c.scratch)-len(c.argBuf) { lrem := len(buf[c.as:]) - // Consider it a protocol error when the remaining payload // is larger than the reported size for PUB. It can happen // when processing incomplete messages from rogue clients. @@ -1134,7 +1135,11 @@ func (c *client) clonePubArg() error { switch c.kind { case ROUTER, GATEWAY: - return c.processRoutedMsgArgs(c.argBuf) + if c.pa.hdr < 0 { + return c.processRoutedMsgArgs(c.argBuf) + } else { + return c.processRoutedHeaderMsgArgs(c.argBuf) + } case LEAF: return c.processLeafMsgArgs(c.argBuf) default: diff --git a/server/parser_test.go b/server/parser_test.go index 00528b88..dfdbf6b2 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -436,6 +436,123 @@ func TestParseHeaderPubArg(t *testing.T) { } } +func TestParseRoutedHeaderMsg(t *testing.T) { + c := dummyRouteClient() + + pub := []byte("HMSG $foo foo 10 8\r\nXXXhello\r") + if err := c.parse(pub); err == nil { + t.Fatalf("Expected an error") + } + + // Clear snapshots + c.argBuf, c.msgBuf, c.state = nil, nil, OP_START + + pub = []byte("HMSG $foo foo 3 8\r\nXXXhello\r") + err := c.parse(pub) + if err != nil || c.state != MSG_END_N { + t.Fatalf("Unexpected: %d : %v\n", c.state, err) + } + if !bytes.Equal(c.pa.account, []byte("$foo")) { + t.Fatalf("Did not parse account correctly: '$foo' vs '%s'\n", c.pa.account) + } + if !bytes.Equal(c.pa.subject, []byte("foo")) { + t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject) + } + if c.pa.reply != nil { + t.Fatalf("Did not parse reply correctly: 'nil' vs '%s'\n", c.pa.reply) + } + if c.pa.hdr != 3 { + t.Fatalf("Did not parse header size correctly: 3 vs %d\n", c.pa.hdr) + } + if c.pa.size != 8 { + t.Fatalf("Did not parse msg size correctly: 8 vs %d\n", c.pa.size) + } + + // Clear snapshots + c.argBuf, c.msgBuf, c.state = nil, nil, OP_START + + pub = []byte("HMSG $G foo.bar INBOX.22 3 14\r\nOK:hello world\r") + err = c.parse(pub) + if err != nil || c.state != MSG_END_N { + t.Fatalf("Unexpected: %d : %v\n", c.state, err) + } + if !bytes.Equal(c.pa.account, []byte("$G")) { + t.Fatalf("Did not parse account correctly: '$G' vs '%s'\n", c.pa.account) + } + if !bytes.Equal(c.pa.subject, []byte("foo.bar")) { + t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject) + } + if !bytes.Equal(c.pa.reply, []byte("INBOX.22")) { + t.Fatalf("Did not parse reply correctly: 'INBOX.22' vs '%s'\n", c.pa.reply) + } + if c.pa.hdr != 3 { + t.Fatalf("Did not parse header size correctly: 3 vs %d\n", c.pa.hdr) + } + if c.pa.size != 14 { + t.Fatalf("Did not parse msg size correctly: 14 vs %d\n", c.pa.size) + } + + // Clear snapshots + c.argBuf, c.msgBuf, c.state = nil, nil, OP_START + + pub = []byte("HMSG $G foo.bar + reply baz 3 14\r\nOK:hello world\r") + err = c.parse(pub) + if err != nil || c.state != MSG_END_N { + t.Fatalf("Unexpected: %d : %v\n", c.state, err) + } + if !bytes.Equal(c.pa.account, []byte("$G")) { + t.Fatalf("Did not parse account correctly: '$G' vs '%s'\n", c.pa.account) + } + if !bytes.Equal(c.pa.subject, []byte("foo.bar")) { + t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject) + } + if !bytes.Equal(c.pa.reply, []byte("reply")) { + t.Fatalf("Did not parse reply correctly: 'reply' vs '%s'\n", c.pa.reply) + } + if len(c.pa.queues) != 1 { + t.Fatalf("Expected 1 queue, got %d", len(c.pa.queues)) + } + if !bytes.Equal(c.pa.queues[0], []byte("baz")) { + t.Fatalf("Did not parse queues correctly: 'baz' vs '%q'\n", c.pa.queues[0]) + } + if c.pa.hdr != 3 { + t.Fatalf("Did not parse header size correctly: 3 vs %d\n", c.pa.hdr) + } + if c.pa.size != 14 { + t.Fatalf("Did not parse msg size correctly: 14 vs %d\n", c.pa.size) + } + + // Clear snapshots + c.argBuf, c.msgBuf, c.state = nil, nil, OP_START + + pub = []byte("HMSG $G foo.bar | baz 3 14\r\nOK:hello world\r") + err = c.parse(pub) + if err != nil || c.state != MSG_END_N { + t.Fatalf("Unexpected: %d : %v\n", c.state, err) + } + if !bytes.Equal(c.pa.account, []byte("$G")) { + t.Fatalf("Did not parse account correctly: '$G' vs '%s'\n", c.pa.account) + } + if !bytes.Equal(c.pa.subject, []byte("foo.bar")) { + t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject) + } + if !bytes.Equal(c.pa.reply, []byte("")) { + t.Fatalf("Did not parse reply correctly: '' vs '%s'\n", c.pa.reply) + } + if len(c.pa.queues) != 1 { + t.Fatalf("Expected 1 queue, got %d", len(c.pa.queues)) + } + if !bytes.Equal(c.pa.queues[0], []byte("baz")) { + t.Fatalf("Did not parse queues correctly: 'baz' vs '%q'\n", c.pa.queues[0]) + } + if c.pa.hdr != 3 { + t.Fatalf("Did not parse header size correctly: 3 vs %d\n", c.pa.hdr) + } + if c.pa.size != 14 { + t.Fatalf("Did not parse msg size correctly: 14 vs %d\n", c.pa.size) + } +} + func TestParseRouteMsg(t *testing.T) { c := dummyRouteClient() diff --git a/server/route.go b/server/route.go index ba0a4fab..c4dfccea 100644 --- a/server/route.go +++ b/server/route.go @@ -220,14 +220,20 @@ func (c *client) processRoutedHeaderMsgArgs(arg []byte) error { // Grab queue names. if c.pa.reply != nil { - c.pa.queues = args[4 : len(args)-1] + c.pa.queues = args[4 : len(args)-2] } else { - c.pa.queues = args[3 : len(args)-1] + c.pa.queues = args[3 : len(args)-2] } } + if c.pa.hdr < 0 { + return fmt.Errorf("processRoutedHeaderMsgArgs Bad or Missing Header Size: '%s'", arg) + } if c.pa.size < 0 { return fmt.Errorf("processRoutedHeaderMsgArgs Bad or Missing Size: '%s'", args) } + if c.pa.hdr > c.pa.size { + return fmt.Errorf("processRoutedHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg) + } // Common ones processed after check for arg length c.pa.account = args[0]