MSG proto processing

This commit is contained in:
Derek Collison
2013-07-29 15:48:07 -07:00
parent 5ab044e2c8
commit ec4a46c959
5 changed files with 270 additions and 55 deletions

View File

@@ -241,7 +241,48 @@ func (c *client) processPong() {
c.mu.Unlock()
}
const argsLenMax = 3
func (c *client) processMsgArgs(arg []byte) error {
// Unroll splitArgs to avoid runtime/heap issues
a := [MAX_MSG_ARGS][]byte{}
args := a[:0]
start := -1
for i, b := range arg {
switch b {
case ' ', '\t', '\r', '\n':
if start >= 0 {
args = append(args, arg[start:i])
start = -1
}
default:
if start < 0 {
start = i
}
}
}
if start >= 0 {
args = append(args, arg[start:])
}
switch len(args) {
case 3:
c.pa.subject = args[0]
c.pa.reply = nil
c.pa.szb = args[2]
c.pa.size = parseSize(args[2])
case 4:
c.pa.subject = args[0]
c.pa.reply = args[2]
c.pa.szb = args[3]
c.pa.size = parseSize(args[3])
default:
return fmt.Errorf("processMsgArgs Parse Error: '%s'", arg)
}
if c.pa.size < 0 {
return fmt.Errorf("processMsgArgs Bad or Missing Size: '%s'", arg)
}
return nil
}
func (c *client) processPub(arg []byte) error {
if trace > 0 {
@@ -249,7 +290,7 @@ func (c *client) processPub(arg []byte) error {
}
// Unroll splitArgs to avoid runtime/heap issues
a := [argsLenMax][]byte{}
a := [MAX_PUB_ARGS][]byte{}
args := a[:0]
start := -1
for i, b := range arg {
@@ -293,7 +334,7 @@ func (c *client) processPub(arg []byte) error {
}
func splitArg(arg []byte) [][]byte {
a := [argsLenMax][]byte{}
a := [MAX_MSG_ARGS][]byte{}
args := a[:0]
start := -1
for i, b := range arg {
@@ -414,6 +455,7 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte {
return mh
}
// FIXME(dlc) extra type might negate..
// Used to treat map as efficient set
type empty struct{}
@@ -653,8 +695,10 @@ func (c *client) clearConnection() {
func (c *client) typeString() string {
switch c.typ {
case CLIENT: return "Client"
case ROUTER: return "Router"
case CLIENT:
return "Client"
case ROUTER:
return "Router"
}
return "Unknown Type"
}

View File

@@ -51,4 +51,12 @@ const (
// Route dial timeout
DEFAULT_ROUTE_DIAL = 2 * time.Second
// Default size of proto to print on parse errors
PROTO_SNIPPET_SIZE = 32
// Maximum number of arguments from MSG proto
MAX_MSG_ARGS = 4
// Maximum number of arguments from PUB proto
MAX_PUB_ARGS = 3
)

View File

@@ -57,6 +57,11 @@ const (
OP_UNSU
OP_UNSUB
UNSUB_ARG
OP_M
OP_MS
OP_MSG
OP_MSG_SPC
MSG_ARG
)
func (c *client) parse(buf []byte) error {
@@ -78,6 +83,8 @@ func (c *client) parse(buf []byte) error {
c.state = OP_S
case 'U', 'u':
c.state = OP_U
case 'M', 'm':
c.state = OP_M
default:
goto parseErr
}
@@ -106,7 +113,7 @@ func (c *client) parse(buf []byte) error {
default:
goto parseErr
}
case OP_PUB_SPC:
case OP_PUB_SPC:
switch b {
case ' ', '\t':
continue
@@ -360,6 +367,55 @@ func (c *client) parse(buf []byte) error {
}
c.drop, c.state = 0, OP_START
}
case OP_M:
switch b {
case 'S', 's':
c.state = OP_MS
default:
goto parseErr
}
case OP_MS:
switch b {
case 'G', 'g':
c.state = OP_MSG
default:
goto parseErr
}
case OP_MSG:
switch b {
case ' ', '\t':
c.state = OP_MSG_SPC
default:
goto parseErr
}
case OP_MSG_SPC:
switch b {
case ' ', '\t':
continue
default:
c.state = MSG_ARG
c.as = i
}
case MSG_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processMsgArgs(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
default:
goto parseErr
}
@@ -390,13 +446,19 @@ authErr:
parseErr:
c.sendErr("Unknown Protocol Operation")
stop := i + 32
if stop > len(buf) {
stop = len(buf)-1
}
return fmt.Errorf("Parse Error, state=%d,i=%d: '%s'", c.state, i, buf[i:stop])
snip := protoSnippet(i, buf)
err := fmt.Errorf("%s Parser ERROR, state=%d, i=%d: proto='%s...'",
c.typeString(), c.state, i, snip)
return err
}
func protoSnippet(start int, buf []byte) string {
stop := start + PROTO_SNIPPET_SIZE
if stop > len(buf) {
stop = len(buf) - 1
}
return fmt.Sprintf("%q", buf[start:stop])
}
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
// we need to hold onto it into the next read.

View File

@@ -5,7 +5,6 @@ package test
import (
"encoding/json"
"fmt"
"net"
"runtime"
"strings"
"testing"
@@ -35,18 +34,7 @@ func TestRouterListeningSocket(t *testing.T) {
// Check that the cluster socket is able to be connected.
addr := fmt.Sprintf("%s:%d", opts.ClusterHost, opts.ClusterPort)
end := time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
conn, err := net.Dial("tcp", addr)
if err != nil {
time.Sleep(50 * time.Millisecond)
// Retry
continue
}
conn.Close()
return
}
t.Fatalf("Failed to connect to the cluster port: %q", addr)
checkSocket(t, addr, 2*time.Second)
}
func TestRouteGoServerShutdown(t *testing.T) {
@@ -64,8 +52,8 @@ func TestSendRouteInfoOnConnect(t *testing.T) {
s, opts := runRouteServer(t)
defer s.Shutdown()
rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
doRouteAuthConnect(t, rc, opts.ClusterUsername, opts.ClusterPassword)
buf := expectResult(t, rc, infoRe)
_, expect := setupRoute(t, rc, opts)
buf := expect(infoRe)
info := server.Info{}
if err := json.Unmarshal(buf[4:], &info); err != nil {
@@ -90,9 +78,10 @@ func TestSendRouteSubAndUnsub(t *testing.T) {
send, _ := setupConn(t, c)
// We connect to the route.
rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
expectAuthRequired(t, rc)
doRouteAuthConnect(t, rc, opts.ClusterUsername, opts.ClusterPassword)
setupRoute(t, rc, opts)
// Send SUB via client connection
send("SUB foo 22\r\n")
@@ -127,27 +116,75 @@ func TestSendRouteSolicit(t *testing.T) {
t.Fatalf("Need an outbound solicted route for this test")
}
rUrl := opts.Routes[0]
hp := rUrl.Host
l, e := net.Listen("tcp", hp)
if e != nil {
t.Fatalf("Error listening on %v", hp)
}
tl := l.(*net.TCPListener)
tl.SetDeadline(time.Now().Add(2 * server.DEFAULT_ROUTE_CONNECT))
conn, err := l.Accept()
if err != nil {
t.Fatalf("Did not receive a connection request: %v", err)
}
conn := acceptRouteConn(t, rUrl.Host, server.DEFAULT_ROUTE_CONNECT)
defer conn.Close()
// We should receive a connect message right away due to auth.
buf := expectResult(t, conn, connectRe)
// Check INFO follows. Could be inline, with first result, if not
// check again.
// check followon buffer.
if !inlineInfoRe.Match(buf) {
expectResult(t, conn, infoRe)
}
}
func TestRouteForwardsMsgFromClients(t *testing.T) {
s, opts := runRouteServer(t)
defer s.Shutdown()
client := createClientConn(t, opts.Host, opts.Port)
defer client.Close()
clientSend, _ := setupConn(t, client)
route := acceptRouteConn(t, opts.Routes[0].Host, server.DEFAULT_ROUTE_CONNECT)
defer route.Close()
routeSend, routeExpect := setupRoute(t, route, opts)
expectMsgs := expectMsgsCommand(t, routeExpect)
// Eat the CONNECT and INFO protos
buf := routeExpect(connectRe)
if !inlineInfoRe.Match(buf) {
fmt.Printf("Looking for separate INFO\n")
routeExpect(infoRe)
}
// Send SUB via route connection
routeSend("SUB foo RSID:2:22\r\n")
// Send PUB via client connection
clientSend("PUB foo 2\r\nok\r\n")
matches := expectMsgs(1)
checkMsg(t, matches[0], "foo", "RSID:2:22", "", "2", "ok")
}
func TestRouteForwardsMsgToClients(t *testing.T) {
s, opts := runRouteServer(t)
defer s.Shutdown()
client := createClientConn(t, opts.Host, opts.Port)
defer client.Close()
clientSend, clientExpect := setupConn(t, client)
expectMsgs := expectMsgsCommand(t, clientExpect)
route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort)
expectAuthRequired(t, route)
routeSend, _ := setupRoute(t, route, opts)
// Subscribe to foo
clientSend("SUB foo 1\r\n")
// Use ping roundtrip to make sure its processed.
clientSend("PING\r\n")
clientExpect(pongRe)
// Send MSG via route connection
routeSend("MSG foo 1 2\r\nok\r\n")
matches := expectMsgs(1)
checkMsg(t, matches[0], "foo", "1", "", "2", "ok")
}

View File

@@ -3,12 +3,12 @@
package test
import (
"bytes"
"encoding/json"
"fmt"
"net"
"os/exec"
"regexp"
"runtime"
"strings"
"time"
@@ -109,6 +109,45 @@ func (s *natsServer) stopServer() {
}
}
func stackFatalf(t tLogger, f string, args ...interface{}) {
lines := make([]string, 0, 32)
msg := fmt.Sprintf(f, args...)
lines = append(lines, msg)
// Ignore ourselves
_, testFile, _, _ := runtime.Caller(0)
// Generate the Stack of callers:
for i := 0; true; i++ {
_, file, line, ok := runtime.Caller(i)
if ok == false {
break
}
if file == testFile {
continue
}
msg := fmt.Sprintf("%d - %s:%d", i, file, line)
lines = append(lines, msg)
}
t.Fatalf("%s", strings.Join(lines, "\n"))
}
func acceptRouteConn(t tLogger, host string, timeout time.Duration) net.Conn {
l, e := net.Listen("tcp", host)
if e != nil {
stackFatalf(t, "Error listening for route connection on %v", host)
}
tl := l.(*net.TCPListener)
tl.SetDeadline(time.Now().Add(timeout))
conn, err := l.Accept()
if err != nil {
stackFatalf(t, "Did not receive a route connection request: %v", err)
}
return conn
}
func createRouteConn(t tLogger, host string, port int) net.Conn {
return createClientConn(t, host, port)
}
@@ -117,7 +156,7 @@ func createClientConn(t tLogger, host string, port int) net.Conn {
addr := fmt.Sprintf("%s:%d", host, port)
c, err := net.DialTimeout("tcp", addr, 1*time.Second)
if err != nil {
t.Fatalf("Could not connect to server: %v\n", err)
stackFatalf(t, "Could not connect to server: %v\n", err)
}
return c
}
@@ -128,7 +167,7 @@ func doConnect(t tLogger, c net.Conn, verbose, pedantic, ssl bool) {
var sinfo server.Info
err := json.Unmarshal(js, &sinfo)
if err != nil {
t.Fatalf("Could not unmarshal INFO json: %v\n", err)
stackFatalf(t, "Could not unmarshal INFO json: %v\n", err)
}
cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"ssl_required\":%v}\r\n", verbose, pedantic, ssl)
sendProto(t, c, cs)
@@ -139,11 +178,37 @@ func doDefaultConnect(t tLogger, c net.Conn) {
doConnect(t, c, false, false, false)
}
func checkSocket(t tLogger, addr string, wait time.Duration) {
end := time.Now().Add(wait)
for time.Now().Before(end) {
conn, err := net.Dial("tcp", addr)
if err != nil {
time.Sleep(50 * time.Millisecond)
// Retry
continue
}
// We bound to the addr, so close and return success.
conn.Close()
return
}
// We have failed to bind the socket in the time allowed.
t.Fatalf("Failed to connect to the socket: %q", addr)
}
func doRouteAuthConnect(t tLogger, c net.Conn, user, pass string) {
cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", user, pass)
sendProto(t, c, cs)
}
func setupRoute(t tLogger, c net.Conn, opts *server.Options) (sendFun, expectFun) {
user := opts.ClusterUsername
pass := opts.ClusterPassword
doRouteAuthConnect(t, c, user, pass)
send := sendCommand(t, c)
expect := expectCommand(t, c)
return send, expect
}
func setupConn(t tLogger, c net.Conn) (sendFun, expectFun) {
doDefaultConnect(t, c)
send := sendCommand(t, c)
@@ -172,10 +237,10 @@ func expectCommand(t tLogger, c net.Conn) expectFun {
func sendProto(t tLogger, c net.Conn, op string) {
n, err := c.Write([]byte(op))
if err != nil {
t.Fatalf("Error writing command to conn: %v\n", err)
stackFatalf(t, "Error writing command to conn: %v\n", err)
}
if n != len(op) {
t.Fatalf("Partial write: %d vs %d\n", n, len(op))
stackFatalf(t, "Partial write: %d vs %d\n", n, len(op))
}
}
@@ -183,7 +248,7 @@ var (
infoRe = regexp.MustCompile(`\AINFO\s+([^\r\n]+)\r\n`)
pingRe = regexp.MustCompile(`\APING\r\n`)
pongRe = regexp.MustCompile(`\APONG\r\n`)
msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n([^\\r\\n]*?)\r\n)+?)`)
msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
okRe = regexp.MustCompile(`\A\+OK\r\n`)
errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`)
subRe = regexp.MustCompile(`\ASUB\s+([^\s]+)((\s+)([^\s]+))?\s+([^\s]+)\r\n`)
@@ -212,13 +277,12 @@ func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte {
n, err := c.Read(expBuf)
if n <= 0 && err != nil {
t.Fatalf("Error reading from conn: %v\n", err)
stackFatalf(t, "Error reading from conn: %v\n", err)
}
buf := expBuf[:n]
if !re.Match(buf) {
buf = bytes.Replace(buf, []byte("\r\n"), []byte("\\r\\n"), -1)
t.Fatalf("Response did not match expected: \n\tReceived:'%s'\n\tExpected:'%s'\n", buf, re)
stackFatalf(t, "Response did not match expected: \n\tReceived:'%q'\n\tExpected:'%s'\n", buf, re)
}
return buf
}
@@ -226,19 +290,19 @@ func expectResult(t tLogger, c net.Conn, re *regexp.Regexp) []byte {
// This will check that we got what we expected.
func checkMsg(t tLogger, m [][]byte, subject, sid, reply, len, msg string) {
if string(m[SUB_INDEX]) != subject {
t.Fatalf("Did not get correct subject: expected '%s' got '%s'\n", subject, m[SUB_INDEX])
stackFatalf(t, "Did not get correct subject: expected '%s' got '%s'\n", subject, m[SUB_INDEX])
}
if string(m[SID_INDEX]) != sid {
t.Fatalf("Did not get correct sid: exepected '%s' got '%s'\n", sid, m[SID_INDEX])
stackFatalf(t, "Did not get correct sid: expected '%s' got '%s'\n", sid, m[SID_INDEX])
}
if string(m[REPLY_INDEX]) != reply {
t.Fatalf("Did not get correct reply: exepected '%s' got '%s'\n", reply, m[REPLY_INDEX])
stackFatalf(t, "Did not get correct reply: expected '%s' got '%s'\n", reply, m[REPLY_INDEX])
}
if string(m[LEN_INDEX]) != len {
t.Fatalf("Did not get correct msg length: expected '%s' got '%s'\n", len, m[LEN_INDEX])
stackFatalf(t, "Did not get correct msg length: expected '%s' got '%s'\n", len, m[LEN_INDEX])
}
if string(m[MSG_INDEX]) != msg {
t.Fatalf("Did not get correct msg: expected '%s' got '%s'\n", msg, m[MSG_INDEX])
stackFatalf(t, "Did not get correct msg: expected '%s' got '%s'\n", msg, m[MSG_INDEX])
}
}
@@ -248,7 +312,7 @@ func expectMsgsCommand(t tLogger, ef expectFun) func(int) [][][]byte {
buf := ef(msgRe)
matches := msgRe.FindAllSubmatch(buf, -1)
if len(matches) != expected {
t.Fatalf("Did not get correct # msgs: %d vs %d\n", len(matches), expected)
stackFatalf(t, "Did not get correct # msgs: %d vs %d\n", len(matches), expected)
}
return matches
}