First pass at headers awareness for server

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-04-30 17:05:37 -07:00
parent 3ab203d164
commit d51566881e
9 changed files with 370 additions and 80 deletions

View File

@@ -17,6 +17,17 @@ import (
"fmt"
)
type parserState int
type parseState struct {
state parserState
as int
drop int
pa pubArg
argBuf []byte
msgBuf []byte
scratch [MAX_CONTROL_LINE_SIZE]byte
}
type pubArg struct {
arg []byte
pacache []byte
@@ -27,17 +38,7 @@ type pubArg struct {
szb []byte
queues [][]byte
size int
}
type parserState int
type parseState struct {
state parserState
as int
drop int
pa pubArg
argBuf []byte
msgBuf []byte
scratch [MAX_CONTROL_LINE_SIZE]byte
hdr int
}
// Parser constants
@@ -60,6 +61,12 @@ const (
OP_CONNEC
OP_CONNECT
CONNECT_ARG
OP_H
OP_HP
OP_HPU
OP_HPUB
OP_HPUB_SPC
HPUB_ARG
OP_P
OP_PU
OP_PUB
@@ -144,6 +151,8 @@ func (c *client) parse(buf []byte) error {
switch b {
case 'P', 'p':
c.state = OP_P
case 'H', 'h':
c.state = OP_H
case 'S', 's':
c.state = OP_S
case 'U', 'u':
@@ -177,6 +186,73 @@ func (c *client) parse(buf []byte) error {
default:
goto parseErr
}
case OP_H:
switch b {
case 'P', 'p':
c.state = OP_HP
default:
goto parseErr
}
case OP_HP:
switch b {
case 'U', 'u':
c.state = OP_HPU
default:
goto parseErr
}
case OP_HPU:
switch b {
case 'B', 'b':
c.state = OP_HPUB
default:
goto parseErr
}
case OP_HPUB:
switch b {
case ' ', '\t':
c.state = OP_HPUB_SPC
default:
goto parseErr
}
case OP_HPUB_SPC:
switch b {
case ' ', '\t':
continue
default:
c.pa.hdr = 0
c.state = HPUB_ARG
c.as = i
}
case HPUB_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
if trace {
c.traceInOp("HPUB", arg)
}
if err := c.processHeaderPub(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
// If we don't have a saved buffer then jump ahead with
// the index. If this overruns what is left we fall out
// and process split buffer.
if c.msgBuf == nil {
i = c.as + c.pa.size - LEN_CR_LF
}
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_P:
switch b {
case 'U', 'u':
@@ -207,6 +283,7 @@ func (c *client) parse(buf []byte) error {
case ' ', '\t':
continue
default:
c.pa.hdr = -1
c.state = PUB_ARG
c.as = i
}
@@ -290,7 +367,7 @@ func (c *client) parse(buf []byte) error {
c.drop, c.as, c.state = 0, i+1, OP_START
// Drop all pub args
c.pa.arg, c.pa.pacache, c.pa.account, c.pa.subject = nil, nil, nil, nil
c.pa.reply, c.pa.size, c.pa.szb, c.pa.queues = nil, 0, nil, nil
c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.queues = nil, -1, 0, nil, nil
case OP_A:
switch b {
case '+':
@@ -895,7 +972,8 @@ func (c *client) parse(buf []byte) error {
}
// Check for split buffer scenarios for any ARG state.
if c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG ||
if c.state == SUB_ARG || c.state == UNSUB_ARG ||
c.state == PUB_ARG || c.state == HPUB_ARG ||
c.state == ASUB_ARG || c.state == AUSUB_ARG ||
c.state == MSG_ARG || c.state == MINUS_ERR_ARG ||
c.state == CONNECT_ARG || c.state == INFO_ARG {
@@ -985,6 +1063,10 @@ func (c *client) clonePubArg() error {
case LEAF:
return c.processLeafMsgArgs(c.argBuf)
default:
return c.processPub(c.argBuf)
if c.pa.hdr < 0 {
return c.processPub(c.argBuf)
} else {
return c.processHeaderPub(c.argBuf)
}
}
}