From fd4612fab021659cd42be70d3c0938d620a09b73 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 20 Nov 2012 08:15:28 -0800 Subject: [PATCH] Properly handle unsub and pub/msg split buffer scenarios --- server/parser.go | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/server/parser.go b/server/parser.go index d286902b..5645ceff 100644 --- a/server/parser.go +++ b/server/parser.go @@ -4,7 +4,6 @@ package server import ( "fmt" -// "log" ) type pubArg struct { @@ -59,8 +58,6 @@ func (c *client) parse(buf []byte) error { var i int var b byte -// log.Printf("parse: len = %d, cap = %d\n", len(buf), cap(buf)) -// log.Printf("Parse bytes: '%s'\n", buf) c.nr++ c.nb += len(buf) @@ -111,14 +108,13 @@ func (c *client) parse(buf []byte) error { var arg []byte if c.argBuf != nil { arg = c.argBuf - c.argBuf = nil } else { arg = buf[c.as : i-c.drop] } + c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD if err := c.processPub(arg); err != nil { return err } - c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD default: if c.argBuf != nil { c.argBuf = append(c.argBuf, b) @@ -128,7 +124,7 @@ func (c *client) parse(buf []byte) error { if c.msgBuf != nil { if len(c.msgBuf) >= c.pa.size { c.processMsg(c.msgBuf) - c.msgBuf, c.state = nil, MSG_END + c.argBuf, c.msgBuf, c.state = nil, nil, MSG_END } else { c.msgBuf = append(c.msgBuf, b) } @@ -327,15 +323,22 @@ func (c *client) parse(buf []byte) error { goto parseErr } } - // Check for split buffer scenarios - if (c.state == SUB_ARG || c.state == PUB_ARG) && c.argBuf == nil { + // Check for split buffer scenarios for SUB and UNSUB and PUB + if (c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG) && c.argBuf == nil { c.argBuf = c.scratch[:0] c.argBuf = append(c.argBuf, buf[c.as:(i+1)-c.drop]...) // FIXME, check max len } + // Check for split msg if c.state == MSG_PAYLOAD && c.msgBuf == nil { + // We need to clone the pubArg if it is still referencing the + // read buffer and we are not able to process the msg. + if c.argBuf == nil { + c.clonePubArg() + } // FIXME: copy better here? Make whole buf if large? - c.msgBuf = c.scratch[:0] + //c.msgBuf = c.scratch[:0] + c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)] c.msgBuf = append(c.msgBuf, (buf[c.as:])...) } return nil @@ -343,3 +346,16 @@ func (c *client) parse(buf []byte) error { parseErr: return fmt.Errorf("Parse Error [%d]: '%s'", c.state, buf[i:]) } + + +// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but +// we need to hold onto it into the next read. +func (c *client) clonePubArg() { + c.argBuf = c.scratch[:0] + c.argBuf = append(c.argBuf, c.pa.subject...) + c.argBuf = append(c.argBuf, c.pa.reply...) + c.argBuf = append(c.argBuf, c.pa.szb...) + c.pa.subject = c.argBuf[:len(c.pa.subject)] + c.pa.reply = c.argBuf[len(c.pa.subject) : len(c.pa.subject)+len(c.pa.reply)] + c.pa.szb = c.argBuf[len(c.pa.subject)+len(c.pa.reply):] +}