mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Updates based on feedback, more tests, few bug fixes
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -2654,7 +2654,7 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte, gwrply b
|
||||
|
||||
// Check here if we have a header with our message. If this client can not
|
||||
// support we need to strip the headers from the payload.
|
||||
// The actual header would have been processed correctluy for us, so just
|
||||
// The actual header would have been processed correctly for us, so just
|
||||
// need to update payload.
|
||||
if c.pa.hdr > 0 && !sub.client.headers {
|
||||
msg = msg[c.pa.hdr:]
|
||||
|
||||
@@ -1443,8 +1443,7 @@ func (c *client) processLeafUnsub(arg []byte) error {
|
||||
}
|
||||
|
||||
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
|
||||
fmt.Printf("arg is %q\n", arg)
|
||||
return nil
|
||||
return fmt.Errorf("headers not implemented for leafnodes yet")
|
||||
}
|
||||
|
||||
func (c *client) processLeafMsgArgs(arg []byte) error {
|
||||
|
||||
@@ -287,6 +287,7 @@ func (c *client) parse(buf []byte) error {
|
||||
case ' ', '\t':
|
||||
continue
|
||||
default:
|
||||
c.pa.hdr = 0
|
||||
c.state = HMSG_ARG
|
||||
c.as = i
|
||||
}
|
||||
@@ -876,6 +877,7 @@ func (c *client) parse(buf []byte) error {
|
||||
case ' ', '\t':
|
||||
continue
|
||||
default:
|
||||
c.pa.hdr = -1
|
||||
c.state = MSG_ARG
|
||||
c.as = i
|
||||
}
|
||||
@@ -1050,8 +1052,8 @@ func (c *client) parse(buf []byte) error {
|
||||
if c.state == SUB_ARG || c.state == UNSUB_ARG ||
|
||||
c.state == PUB_ARG || c.state == HPUB_ARG ||
|
||||
c.state == ASUB_ARG || c.state == AUSUB_ARG ||
|
||||
c.state == MSG_ARG || c.state == MINUS_ERR_ARG ||
|
||||
c.state == CONNECT_ARG || c.state == INFO_ARG {
|
||||
c.state == MSG_ARG || c.state == HMSG_ARG ||
|
||||
c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG || c.state == INFO_ARG {
|
||||
// Setup a holder buffer to deal with split buffer scenario.
|
||||
if c.argBuf == nil {
|
||||
c.argBuf = c.scratch[:0]
|
||||
@@ -1084,7 +1086,6 @@ func (c *client) parse(buf []byte) error {
|
||||
// new buffer to hold the split message.
|
||||
if c.pa.size > cap(c.scratch)-len(c.argBuf) {
|
||||
lrem := len(buf[c.as:])
|
||||
|
||||
// Consider it a protocol error when the remaining payload
|
||||
// is larger than the reported size for PUB. It can happen
|
||||
// when processing incomplete messages from rogue clients.
|
||||
@@ -1134,7 +1135,11 @@ func (c *client) clonePubArg() error {
|
||||
|
||||
switch c.kind {
|
||||
case ROUTER, GATEWAY:
|
||||
return c.processRoutedMsgArgs(c.argBuf)
|
||||
if c.pa.hdr < 0 {
|
||||
return c.processRoutedMsgArgs(c.argBuf)
|
||||
} else {
|
||||
return c.processRoutedHeaderMsgArgs(c.argBuf)
|
||||
}
|
||||
case LEAF:
|
||||
return c.processLeafMsgArgs(c.argBuf)
|
||||
default:
|
||||
|
||||
@@ -436,6 +436,123 @@ func TestParseHeaderPubArg(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseRoutedHeaderMsg(t *testing.T) {
|
||||
c := dummyRouteClient()
|
||||
|
||||
pub := []byte("HMSG $foo foo 10 8\r\nXXXhello\r")
|
||||
if err := c.parse(pub); err == nil {
|
||||
t.Fatalf("Expected an error")
|
||||
}
|
||||
|
||||
// Clear snapshots
|
||||
c.argBuf, c.msgBuf, c.state = nil, nil, OP_START
|
||||
|
||||
pub = []byte("HMSG $foo foo 3 8\r\nXXXhello\r")
|
||||
err := c.parse(pub)
|
||||
if err != nil || c.state != MSG_END_N {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
if !bytes.Equal(c.pa.account, []byte("$foo")) {
|
||||
t.Fatalf("Did not parse account correctly: '$foo' vs '%s'\n", c.pa.account)
|
||||
}
|
||||
if !bytes.Equal(c.pa.subject, []byte("foo")) {
|
||||
t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject)
|
||||
}
|
||||
if c.pa.reply != nil {
|
||||
t.Fatalf("Did not parse reply correctly: 'nil' vs '%s'\n", c.pa.reply)
|
||||
}
|
||||
if c.pa.hdr != 3 {
|
||||
t.Fatalf("Did not parse header size correctly: 3 vs %d\n", c.pa.hdr)
|
||||
}
|
||||
if c.pa.size != 8 {
|
||||
t.Fatalf("Did not parse msg size correctly: 8 vs %d\n", c.pa.size)
|
||||
}
|
||||
|
||||
// Clear snapshots
|
||||
c.argBuf, c.msgBuf, c.state = nil, nil, OP_START
|
||||
|
||||
pub = []byte("HMSG $G foo.bar INBOX.22 3 14\r\nOK:hello world\r")
|
||||
err = c.parse(pub)
|
||||
if err != nil || c.state != MSG_END_N {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
if !bytes.Equal(c.pa.account, []byte("$G")) {
|
||||
t.Fatalf("Did not parse account correctly: '$G' vs '%s'\n", c.pa.account)
|
||||
}
|
||||
if !bytes.Equal(c.pa.subject, []byte("foo.bar")) {
|
||||
t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject)
|
||||
}
|
||||
if !bytes.Equal(c.pa.reply, []byte("INBOX.22")) {
|
||||
t.Fatalf("Did not parse reply correctly: 'INBOX.22' vs '%s'\n", c.pa.reply)
|
||||
}
|
||||
if c.pa.hdr != 3 {
|
||||
t.Fatalf("Did not parse header size correctly: 3 vs %d\n", c.pa.hdr)
|
||||
}
|
||||
if c.pa.size != 14 {
|
||||
t.Fatalf("Did not parse msg size correctly: 14 vs %d\n", c.pa.size)
|
||||
}
|
||||
|
||||
// Clear snapshots
|
||||
c.argBuf, c.msgBuf, c.state = nil, nil, OP_START
|
||||
|
||||
pub = []byte("HMSG $G foo.bar + reply baz 3 14\r\nOK:hello world\r")
|
||||
err = c.parse(pub)
|
||||
if err != nil || c.state != MSG_END_N {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
if !bytes.Equal(c.pa.account, []byte("$G")) {
|
||||
t.Fatalf("Did not parse account correctly: '$G' vs '%s'\n", c.pa.account)
|
||||
}
|
||||
if !bytes.Equal(c.pa.subject, []byte("foo.bar")) {
|
||||
t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject)
|
||||
}
|
||||
if !bytes.Equal(c.pa.reply, []byte("reply")) {
|
||||
t.Fatalf("Did not parse reply correctly: 'reply' vs '%s'\n", c.pa.reply)
|
||||
}
|
||||
if len(c.pa.queues) != 1 {
|
||||
t.Fatalf("Expected 1 queue, got %d", len(c.pa.queues))
|
||||
}
|
||||
if !bytes.Equal(c.pa.queues[0], []byte("baz")) {
|
||||
t.Fatalf("Did not parse queues correctly: 'baz' vs '%q'\n", c.pa.queues[0])
|
||||
}
|
||||
if c.pa.hdr != 3 {
|
||||
t.Fatalf("Did not parse header size correctly: 3 vs %d\n", c.pa.hdr)
|
||||
}
|
||||
if c.pa.size != 14 {
|
||||
t.Fatalf("Did not parse msg size correctly: 14 vs %d\n", c.pa.size)
|
||||
}
|
||||
|
||||
// Clear snapshots
|
||||
c.argBuf, c.msgBuf, c.state = nil, nil, OP_START
|
||||
|
||||
pub = []byte("HMSG $G foo.bar | baz 3 14\r\nOK:hello world\r")
|
||||
err = c.parse(pub)
|
||||
if err != nil || c.state != MSG_END_N {
|
||||
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
|
||||
}
|
||||
if !bytes.Equal(c.pa.account, []byte("$G")) {
|
||||
t.Fatalf("Did not parse account correctly: '$G' vs '%s'\n", c.pa.account)
|
||||
}
|
||||
if !bytes.Equal(c.pa.subject, []byte("foo.bar")) {
|
||||
t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject)
|
||||
}
|
||||
if !bytes.Equal(c.pa.reply, []byte("")) {
|
||||
t.Fatalf("Did not parse reply correctly: '' vs '%s'\n", c.pa.reply)
|
||||
}
|
||||
if len(c.pa.queues) != 1 {
|
||||
t.Fatalf("Expected 1 queue, got %d", len(c.pa.queues))
|
||||
}
|
||||
if !bytes.Equal(c.pa.queues[0], []byte("baz")) {
|
||||
t.Fatalf("Did not parse queues correctly: 'baz' vs '%q'\n", c.pa.queues[0])
|
||||
}
|
||||
if c.pa.hdr != 3 {
|
||||
t.Fatalf("Did not parse header size correctly: 3 vs %d\n", c.pa.hdr)
|
||||
}
|
||||
if c.pa.size != 14 {
|
||||
t.Fatalf("Did not parse msg size correctly: 14 vs %d\n", c.pa.size)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseRouteMsg(t *testing.T) {
|
||||
c := dummyRouteClient()
|
||||
|
||||
|
||||
@@ -220,14 +220,20 @@ func (c *client) processRoutedHeaderMsgArgs(arg []byte) error {
|
||||
|
||||
// Grab queue names.
|
||||
if c.pa.reply != nil {
|
||||
c.pa.queues = args[4 : len(args)-1]
|
||||
c.pa.queues = args[4 : len(args)-2]
|
||||
} else {
|
||||
c.pa.queues = args[3 : len(args)-1]
|
||||
c.pa.queues = args[3 : len(args)-2]
|
||||
}
|
||||
}
|
||||
if c.pa.hdr < 0 {
|
||||
return fmt.Errorf("processRoutedHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
|
||||
}
|
||||
if c.pa.size < 0 {
|
||||
return fmt.Errorf("processRoutedHeaderMsgArgs Bad or Missing Size: '%s'", args)
|
||||
}
|
||||
if c.pa.hdr > c.pa.size {
|
||||
return fmt.Errorf("processRoutedHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
|
||||
}
|
||||
|
||||
// Common ones processed after check for arg length
|
||||
c.pa.account = args[0]
|
||||
|
||||
Reference in New Issue
Block a user