diff --git a/server/const.go b/server/const.go index 4131ba85..f58fb56e 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.1.1-RC5" + VERSION = "2.1.1-RC6" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/parser.go b/server/parser.go index 9f7b0430..3c129977 100644 --- a/server/parser.go +++ b/server/parser.go @@ -867,7 +867,9 @@ func (c *client) parse(buf []byte) error { // read buffer and we are not able to process the msg. if c.argBuf == nil { // Works also for MSG_ARG, when message comes from ROUTE. - c.clonePubArg() + if err := c.clonePubArg(); err != nil { + goto parseErr + } } // If we will overflow the scratch buffer, just create a @@ -917,15 +919,17 @@ func protoSnippet(start int, buf []byte) string { // clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but // we need to hold onto it into the next read. -func (c *client) clonePubArg() { +func (c *client) clonePubArg() error { // Just copy and re-process original arg buffer. c.argBuf = c.scratch[:0] c.argBuf = append(c.argBuf, c.pa.arg...) - // This is a routed msg - if c.pa.account != nil { - c.processRoutedMsgArgs(false, c.argBuf) - } else { - c.processPub(false, c.argBuf) + switch c.kind { + case ROUTER, GATEWAY: + return c.processRoutedMsgArgs(false, c.argBuf) + case LEAF: + return c.processLeafMsgArgs(false, c.argBuf) + default: + return c.processPub(false, c.argBuf) } } diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 66244bb8..35694360 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -98,6 +98,41 @@ func TestLeafNodeInfo(t *testing.T) { }) } +func TestLeafNodeSplitBuffer(t *testing.T) { + s, opts := runLeafServer() + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + nc.QueueSubscribe("foo", "bar", func(m *nats.Msg) { + m.Respond([]byte("ok")) + }) + nc.Flush() + + lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) + defer lc.Close() + sendProto(t, lc, "CONNECT {}\r\n") + checkLeafNodeConnected(t, s) + + leafSend, leafExpect := setupLeaf(t, lc, 2) + + leafSend("LS+ reply\r\nPING\r\n") + leafExpect(pongRe) + + leafSend("LMSG foo ") + time.Sleep(time.Millisecond) + leafSend("+ reply bar 2\r\n") + time.Sleep(time.Millisecond) + leafSend("OK\r") + time.Sleep(time.Millisecond) + leafSend("\n") + leafExpect(lmsgRe) +} + func TestNumLeafNodes(t *testing.T) { s, opts := runLeafServer() defer s.Shutdown()