mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix parser bug around MSG protocol.
Make sure we do the right thing when no args are presented for an MSG, e.g. MSG <spc>. Also do not parse at all of this is a client, only valid for routes.
This commit is contained in:
@@ -374,9 +374,6 @@ func (c *client) processMsgArgs(arg []byte) error {
|
||||
args = append(args, arg[start:])
|
||||
}
|
||||
|
||||
c.pa.subject = args[0]
|
||||
c.pa.sid = args[1]
|
||||
|
||||
switch len(args) {
|
||||
case 3:
|
||||
c.pa.reply = nil
|
||||
@@ -392,6 +389,11 @@ func (c *client) processMsgArgs(arg []byte) error {
|
||||
if c.pa.size < 0 {
|
||||
return fmt.Errorf("processMsgArgs Bad or Missing Size: '%s'", arg)
|
||||
}
|
||||
|
||||
// Common ones processed after check for arg length
|
||||
c.pa.subject = args[0]
|
||||
c.pa.sid = args[1]
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -104,7 +104,11 @@ func (c *client) parse(buf []byte) error {
|
||||
case 'U', 'u':
|
||||
c.state = OP_U
|
||||
case 'M', 'm':
|
||||
c.state = OP_M
|
||||
if c.typ == CLIENT {
|
||||
goto parseErr
|
||||
} else {
|
||||
c.state = OP_M
|
||||
}
|
||||
case 'C', 'c':
|
||||
c.state = OP_C
|
||||
case 'I', 'i':
|
||||
@@ -613,7 +617,7 @@ func (c *client) parse(buf []byte) error {
|
||||
c.state == CONNECT_ARG) && c.argBuf == nil {
|
||||
c.argBuf = c.scratch[:0]
|
||||
c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
|
||||
// FIXME, check max len
|
||||
// FIXME(dlc), check max control line len
|
||||
}
|
||||
// Check for split msg
|
||||
if (c.state == MSG_PAYLOAD || c.state == MSG_END) && c.msgBuf == nil {
|
||||
|
||||
@@ -11,6 +11,10 @@ func dummyClient() *client {
|
||||
return &client{}
|
||||
}
|
||||
|
||||
func dummyRouteClient() *client {
|
||||
return &client{typ: ROUTER}
|
||||
}
|
||||
|
||||
func TestParsePing(t *testing.T) {
|
||||
c := dummyClient()
|
||||
if c.state != OP_START {
|
||||
@@ -233,7 +237,7 @@ func TestParsePubBadSize(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestParseMsg(t *testing.T) {
|
||||
c := dummyClient()
|
||||
c := dummyRouteClient()
|
||||
|
||||
pub := []byte("MSG foo RSID:1:2 5\r\nhello\r")
|
||||
err := c.parse(pub)
|
||||
@@ -310,6 +314,22 @@ func TestParseMsgArg(t *testing.T) {
|
||||
testMsgArg(c, t)
|
||||
}
|
||||
|
||||
func TestParseMsgSpace(t *testing.T) {
|
||||
c := dummyRouteClient()
|
||||
|
||||
// Ivan bug he found
|
||||
if err := c.parse([]byte("MSG \r\n")); err == nil {
|
||||
t.Fatalf("Expected parse error for MSG <SPC>")
|
||||
}
|
||||
|
||||
c = dummyClient()
|
||||
|
||||
// Anything with an M from a client should parse error
|
||||
if err := c.parse([]byte("M")); err == nil {
|
||||
t.Fatalf("Expected parse error for M* from a client")
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldFail(t *testing.T) {
|
||||
c := dummyClient()
|
||||
|
||||
|
||||
@@ -344,6 +344,8 @@ func TestSplitDanglingArgBuf(t *testing.T) {
|
||||
|
||||
func TestSplitMsgArg(t *testing.T) {
|
||||
_, c, _ := setupClient()
|
||||
// Allow parser to process MSG
|
||||
c.typ = ROUTER
|
||||
|
||||
b := make([]byte, 1024)
|
||||
|
||||
@@ -371,7 +373,7 @@ func TestSplitMsgArg(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSplitBufferMsgOp(t *testing.T) {
|
||||
c := &client{subs: hashmap.New()}
|
||||
c := &client{subs: hashmap.New(), typ: ROUTER}
|
||||
msg := []byte("MSG foo.bar QRSID:15:3 _INBOX.22 11\r\nhello world\r")
|
||||
msg1 := msg[:2]
|
||||
msg2 := msg[2:9]
|
||||
|
||||
Reference in New Issue
Block a user