mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Properly handle unsub and pub/msg split buffer scenarios
This commit is contained in:
@@ -4,7 +4,6 @@ package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
// "log"
|
||||
)
|
||||
|
||||
type pubArg struct {
|
||||
@@ -59,8 +58,6 @@ func (c *client) parse(buf []byte) error {
|
||||
var i int
|
||||
var b byte
|
||||
|
||||
// log.Printf("parse: len = %d, cap = %d\n", len(buf), cap(buf))
|
||||
// log.Printf("Parse bytes: '%s'\n", buf)
|
||||
c.nr++
|
||||
c.nb += len(buf)
|
||||
|
||||
@@ -111,14 +108,13 @@ func (c *client) parse(buf []byte) error {
|
||||
var arg []byte
|
||||
if c.argBuf != nil {
|
||||
arg = c.argBuf
|
||||
c.argBuf = nil
|
||||
} else {
|
||||
arg = buf[c.as : i-c.drop]
|
||||
}
|
||||
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
|
||||
if err := c.processPub(arg); err != nil {
|
||||
return err
|
||||
}
|
||||
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
|
||||
default:
|
||||
if c.argBuf != nil {
|
||||
c.argBuf = append(c.argBuf, b)
|
||||
@@ -128,7 +124,7 @@ func (c *client) parse(buf []byte) error {
|
||||
if c.msgBuf != nil {
|
||||
if len(c.msgBuf) >= c.pa.size {
|
||||
c.processMsg(c.msgBuf)
|
||||
c.msgBuf, c.state = nil, MSG_END
|
||||
c.argBuf, c.msgBuf, c.state = nil, nil, MSG_END
|
||||
} else {
|
||||
c.msgBuf = append(c.msgBuf, b)
|
||||
}
|
||||
@@ -327,15 +323,22 @@ func (c *client) parse(buf []byte) error {
|
||||
goto parseErr
|
||||
}
|
||||
}
|
||||
// Check for split buffer scenarios
|
||||
if (c.state == SUB_ARG || c.state == PUB_ARG) && c.argBuf == nil {
|
||||
// Check for split buffer scenarios for SUB and UNSUB and PUB
|
||||
if (c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG) && c.argBuf == nil {
|
||||
c.argBuf = c.scratch[:0]
|
||||
c.argBuf = append(c.argBuf, buf[c.as:(i+1)-c.drop]...)
|
||||
// FIXME, check max len
|
||||
}
|
||||
// Check for split msg
|
||||
if c.state == MSG_PAYLOAD && c.msgBuf == nil {
|
||||
// We need to clone the pubArg if it is still referencing the
|
||||
// read buffer and we are not able to process the msg.
|
||||
if c.argBuf == nil {
|
||||
c.clonePubArg()
|
||||
}
|
||||
// FIXME: copy better here? Make whole buf if large?
|
||||
c.msgBuf = c.scratch[:0]
|
||||
//c.msgBuf = c.scratch[:0]
|
||||
c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
|
||||
c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
|
||||
}
|
||||
return nil
|
||||
@@ -343,3 +346,16 @@ func (c *client) parse(buf []byte) error {
|
||||
parseErr:
|
||||
return fmt.Errorf("Parse Error [%d]: '%s'", c.state, buf[i:])
|
||||
}
|
||||
|
||||
|
||||
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
|
||||
// we need to hold onto it into the next read.
|
||||
func (c *client) clonePubArg() {
|
||||
c.argBuf = c.scratch[:0]
|
||||
c.argBuf = append(c.argBuf, c.pa.subject...)
|
||||
c.argBuf = append(c.argBuf, c.pa.reply...)
|
||||
c.argBuf = append(c.argBuf, c.pa.szb...)
|
||||
c.pa.subject = c.argBuf[:len(c.pa.subject)]
|
||||
c.pa.reply = c.argBuf[len(c.pa.subject) : len(c.pa.subject)+len(c.pa.reply)]
|
||||
c.pa.szb = c.argBuf[len(c.pa.subject)+len(c.pa.reply):]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user