mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-15 18:50:41 -07:00
Merge pull request #1198 from nats-io/fix_clone_msg_args
[FIXED] Handling of split buffer for LEAF messages
This commit is contained in:
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.1.1-RC5"
|
||||
VERSION = "2.1.1-RC6"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -867,7 +867,9 @@ func (c *client) parse(buf []byte) error {
|
||||
// read buffer and we are not able to process the msg.
|
||||
if c.argBuf == nil {
|
||||
// Works also for MSG_ARG, when message comes from ROUTE.
|
||||
c.clonePubArg()
|
||||
if err := c.clonePubArg(); err != nil {
|
||||
goto parseErr
|
||||
}
|
||||
}
|
||||
|
||||
// If we will overflow the scratch buffer, just create a
|
||||
@@ -917,15 +919,17 @@ func protoSnippet(start int, buf []byte) string {
|
||||
|
||||
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
|
||||
// we need to hold onto it into the next read.
|
||||
func (c *client) clonePubArg() {
|
||||
func (c *client) clonePubArg() error {
|
||||
// Just copy and re-process original arg buffer.
|
||||
c.argBuf = c.scratch[:0]
|
||||
c.argBuf = append(c.argBuf, c.pa.arg...)
|
||||
|
||||
// This is a routed msg
|
||||
if c.pa.account != nil {
|
||||
c.processRoutedMsgArgs(false, c.argBuf)
|
||||
} else {
|
||||
c.processPub(false, c.argBuf)
|
||||
switch c.kind {
|
||||
case ROUTER, GATEWAY:
|
||||
return c.processRoutedMsgArgs(false, c.argBuf)
|
||||
case LEAF:
|
||||
return c.processLeafMsgArgs(false, c.argBuf)
|
||||
default:
|
||||
return c.processPub(false, c.argBuf)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,6 +98,41 @@ func TestLeafNodeInfo(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestLeafNodeSplitBuffer(t *testing.T) {
|
||||
s, opts := runLeafServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, err := nats.Connect(s.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
nc.QueueSubscribe("foo", "bar", func(m *nats.Msg) {
|
||||
m.Respond([]byte("ok"))
|
||||
})
|
||||
nc.Flush()
|
||||
|
||||
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc.Close()
|
||||
sendProto(t, lc, "CONNECT {}\r\n")
|
||||
checkLeafNodeConnected(t, s)
|
||||
|
||||
leafSend, leafExpect := setupLeaf(t, lc, 2)
|
||||
|
||||
leafSend("LS+ reply\r\nPING\r\n")
|
||||
leafExpect(pongRe)
|
||||
|
||||
leafSend("LMSG foo ")
|
||||
time.Sleep(time.Millisecond)
|
||||
leafSend("+ reply bar 2\r\n")
|
||||
time.Sleep(time.Millisecond)
|
||||
leafSend("OK\r")
|
||||
time.Sleep(time.Millisecond)
|
||||
leafSend("\n")
|
||||
leafExpect(lmsgRe)
|
||||
}
|
||||
|
||||
func TestNumLeafNodes(t *testing.T) {
|
||||
s, opts := runLeafServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
Reference in New Issue
Block a user