first pass server

This commit is contained in:
Derek Collison
2012-11-12 14:26:38 -08:00
parent 05ae8b55b8
commit 6630efb298
9 changed files with 1508 additions and 0 deletions

314
server/client.go Normal file
View File

@@ -0,0 +1,314 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import (
"bufio"
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"sync"
"time"
"github.com/apcera/gnatsd/hashmap"
)
// The size of the bufio reader/writer on top of the socket.
//const defaultBufSize = 32768
const defaultBufSize = 65536
type client struct {
mu sync.Mutex
cid uint64
opts clientOpts
conn net.Conn
bw *bufio.Writer
br *bufio.Reader
srv *Server
subs *hashmap.HashMap
cstats
parseState
}
func (c *client) String() string {
return fmt.Sprintf("cid:%d", c.cid)
}
type cstats struct {
nr int
nb int
nm int
}
type subscription struct {
client *client
subject []byte
queue []byte
sid []byte
nm int64
max int64
}
type clientOpts struct {
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
SslRequired bool `json:"ssl_required"`
}
func init() {
rand.Seed(time.Now().UnixNano())
}
func (c *client) readLoop() {
b := make([]byte, defaultBufSize)
// log.Printf("b len = %d, cap = %d\n", len(b), cap(b))
for {
n, err := c.conn.Read(b)
if err != nil {
// log.Printf("Encountered a read error: %v\n", err)
c.closeConnection()
return
}
if err := c.parse(b[:n]); err != nil {
log.Printf("Parse Error: %v\n", err)
c.closeConnection()
return
}
}
}
func (c *client) processConnect(arg []byte) error {
// log.Printf("Got connect arg: '%s'\n", arg)
// FIXME, check err
return json.Unmarshal(arg, &c.opts)
}
var pongResp = []byte(fmt.Sprintf("PONG%s", CR_LF))
func (c *client) processPing() {
// log.Printf("Process ping\n")
if c.conn == nil {
return
}
// FIXME, check err
c.conn.Write(pongResp)
}
const argsLenMax = 3
func (c *client) processPub(arg []byte) error {
// log.Printf("Got pub arg: '%s'\n", arg)
args := splitArg(arg)
switch len(args) {
case 2:
c.pa.subject = args[0]
c.pa.reply = nil
c.pa.size = parseSize(args[1])
c.pa.szb = args[1]
case 3:
c.pa.subject = args[0]
c.pa.reply = args[1]
c.pa.size = parseSize(args[2])
c.pa.szb = args[2]
default:
return fmt.Errorf("processPub Parse Error: '%s'", arg)
}
if c.pa.size < 0 {
return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg)
}
// log.Printf("Parsed pubArg: %+v\n", c.pa)
return nil
}
func splitArg(arg []byte) [][]byte {
a := [argsLenMax][]byte{}
args := a[:0]
start := -1
for i, b := range arg {
switch b {
case ' ', '\t', '\r', '\n':
if start >= 0 {
args = append(args, arg[start:i])
start = -1
}
default:
if start < 0 {
start = i
}
}
}
if start >= 0 {
args = append(args, arg[start:])
}
return args
}
func (c *client) processSub(argo []byte) error {
// Copy so we do not reference a potentially large buffer
arg := make([]byte, len(argo))
copy(arg, argo)
// log.Printf("Got sub arg for client[%v]: '%s'\n", c, arg)
args := splitArg(arg)
sub := &subscription{client: c}
switch len(args) {
case 2:
sub.subject = args[0]
sub.queue = nil
sub.sid = args[1]
case 3:
sub.subject = args[0]
sub.queue = args[1]
sub.sid = args[2]
default:
return fmt.Errorf("processSub Parse Error: '%s'", arg)
}
// log.Printf("sub.subject = '%s'\n", sub.subject)
// log.Printf("sub.queue = '%s'\n", sub.queue)
// log.Printf("sub.sid = '%s'\n", sub.sid)
if c.subs != nil {
c.subs.Set(sub.sid, sub)
}
if c.srv != nil {
c.srv.sl.Insert(sub.subject, sub)
}
return nil
}
func (c *client) unsubscribe(sub *subscription) {
if sub.max > 0 && sub.nm <= sub.max {
return
}
c.subs.Remove(sub.sid)
if c.srv != nil {
c.srv.sl.Remove(sub.subject, sub)
}
}
func (c *client) processUnsub(arg []byte) error {
// log.Printf("Got unsub arg for client[%v]: '%s'\n", c, arg)
args := splitArg(arg)
var sid []byte
max := -1
switch len(args) {
case 1:
sid = args[0]
case 2:
sid = args[0]
max = parseSize(args[1])
default:
return fmt.Errorf("processUnsub Parse Error: '%s'", arg)
}
sub := (c.subs.Get(sid)).(*subscription)
if max > 0 {
sub.max = int64(max)
}
c.unsubscribe(sub)
return nil
}
func (c *client) msgHeader(mh []byte, sub *subscription) []byte {
mh = append(mh, sub.sid...)
mh = append(mh, ' ')
if c.pa.reply != nil {
mh = append(mh, c.pa.reply...)
mh = append(mh, ' ')
}
mh = append(mh, c.pa.szb...)
mh = append(mh, "\r\n"...)
return mh
}
func (sub *subscription) deliverMsg(mh, msg []byte) {
sub.nm++
if sub.client == nil || sub.client.conn == nil {
return
}
if sub.max > 0 && sub.nm > sub.max {
sub.client.unsubscribe(sub)
return
}
sub.client.mu.Lock()
sub.client.bw.Write(mh)
sub.client.bw.Write(msg)
sub.client.bw.WriteString("\r\n")
// FIXME: Make efficient with flusher..
sub.client.bw.Flush()
sub.client.mu.Unlock()
}
// TODO
// Block pub goroutine on bufio locked write buffer with
// go flusher routine. Single for all connections?
func (c *client) processMsg(msg []byte) {
c.nm++
if c.srv == nil {
return
}
qsubsA := [32]*subscription{}
qsubs := qsubsA[:0]
scratch := [512]byte{}
msgh := scratch[:0]
r := c.srv.sl.Match(c.pa.subject)
if len(r) <= 0 {
return
}
// msg header
msgh = append(msgh, "MSG "...)
msgh = append(msgh, c.pa.subject...)
msgh = append(msgh, ' ')
si := len(msgh)
for _, v := range r {
sub := v.(*subscription)
if sub.queue != nil {
qsubs = append(qsubs, sub)
continue
}
mh := c.msgHeader(msgh[:si], sub)
sub.deliverMsg(mh, msg)
}
if len(qsubs) > 0 {
index := rand.Int() % len(qsubs)
sub := qsubs[index]
mh := c.msgHeader(msgh[:si], sub)
sub.deliverMsg(mh, msg)
}
}
func (c *client) closeConnection() {
if c.conn == nil {
return
}
// log.Printf("Closing Connection: %v\n", c)
// c.bw.Flush()
c.conn.Close()
c.conn = nil
if c.srv != nil {
subs := c.subs.All()
for _, s := range subs {
sub := s.(*subscription)
c.srv.sl.Remove(sub.subject, sub)
}
}
/*
log.Printf("Sublist Stats: %+v\n", c.srv.sl.Stats())
if c.nr > 0 {
log.Printf("stats: %d %d %d\n", c.nr, c.nb, c.nm)
log.Printf("bytes per read: %d\n", c.nb/c.nr)
log.Printf("msgs per read: %d\n", c.nm/c.nr)
}
*/
}

331
server/client_test.go Normal file
View File

@@ -0,0 +1,331 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import (
"bufio"
"bytes"
"encoding/json"
"net"
"reflect"
"regexp"
"strings"
"testing"
)
type serverInfo struct {
Id string `json:"server_id"`
Host string `json:"host"`
Port uint `json:"port"`
Version string `json:"version"`
AuthRequired bool `json:"auth_required"`
SslRequired bool `json:"ssl_required"`
MaxPayload int64 `json:"max_payload"`
}
func createClientAsync(ch chan *client, s *Server, cli net.Conn) {
go func() {
c := s.createClient(cli)
ch <- c
}()
}
func rawSetup() (*Server, *client, *bufio.Reader, string) {
cli, srv := net.Pipe()
cr := bufio.NewReaderSize(cli, defaultBufSize)
s := New()
ch := make(chan *client)
createClientAsync(ch, s, srv)
l, _ := cr.ReadString('\n')
// Grab client
c := <-ch
return s, c, cr, l
}
func setUpClientWithResponse() (*client, string) {
_, c, _, l := rawSetup()
return c, l
}
func setupClient() (*Server, *client, *bufio.Reader) {
s, c, cr, _ := rawSetup()
return s, c, cr
}
func TestClientCreateAndInfo(t *testing.T) {
c, l := setUpClientWithResponse()
if c.cid != 1 {
t.Fatalf("Expected cid of 1 vs %d\n", c.cid)
}
if c.state != OP_START {
t.Fatal("Expected state to be OP_START")
}
if !strings.HasPrefix(l, "INFO ") {
t.Fatalf("INFO response incorrect: %s\n", l)
}
// Make sure payload is proper json
var info serverInfo
err := json.Unmarshal([]byte(l[5:]), &info)
if err != nil {
t.Fatalf("Could not parse INFO json: %v\n", err)
}
// Sanity checks
if info.MaxPayload != MAX_PAYLOAD_SIZE ||
info.AuthRequired || info.SslRequired ||
info.Port != DEFAULT_PORT {
t.Fatalf("INFO inconsistent: %+v\n", info)
}
}
func TestClientConnect(t *testing.T) {
_, c, _ := setupClient()
// Basic Connect
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n")
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{true, true, false}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
}
func TestClientPing(t *testing.T) {
_, c, cr := setupClient()
// PING
pingOp := []byte("PING\r\n")
go c.parse(pingOp)
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving info from server: %v\n", err)
}
if !strings.HasPrefix(l, "PONG\r\n") {
t.Fatalf("PONG response incorrect: %s\n", l)
}
}
var msgPat = regexp.MustCompile(`\AMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n`)
const (
SUB_INDEX = 1
SID_INDEX = 2
REPLY_INDEX = 4
LEN_INDEX = 5
)
func checkPayload(cr *bufio.Reader, expected []byte, t *testing.T) {
// Read in payload
d := make([]byte, len(expected))
n, err := cr.Read(d)
if err != nil {
t.Fatalf("Error receiving msg payload from server: %v\n", err)
}
if n != len(expected) {
t.Fatalf("Did not read correct amount of bytes: %d vs %d\n", n, len(expected))
}
if !bytes.Equal(d, expected) {
t.Fatalf("Did not read correct payload:: <%s>\n", d)
}
}
func TestClientSimplePubSub(t *testing.T) {
_, c, cr := setupClient()
// SUB/PUB
go c.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\n"))
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if len(matches) != 6 {
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6)
}
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
}
if matches[LEN_INDEX] != "5" {
t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX])
}
checkPayload(cr, []byte("hello\r\n"), t)
}
func TestClientSimplePubSubWithReply(t *testing.T) {
_, c, cr := setupClient()
// SUB/PUB
go c.parse([]byte("SUB foo 1\r\nPUB foo bar 5\r\nhello\r\n"))
l, err := cr.ReadString('\n')
if err != nil {
t.Fatalf("Error receiving msg from server: %v\n", err)
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if len(matches) != 6 {
t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6)
}
if matches[SUB_INDEX] != "foo" {
t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX])
}
if matches[SID_INDEX] != "1" {
t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX])
}
if matches[REPLY_INDEX] != "bar" {
t.Fatalf("Did not get correct reply subject: '%s'\n", matches[REPLY_INDEX])
}
if matches[LEN_INDEX] != "5" {
t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX])
}
}
func TestClientPubWithQueueSub(t *testing.T) {
_, c, cr := setupClient()
num := 10
// Queue SUB/PUB
subs := []byte("SUB foo g1 1\r\nSUB foo g1 2\r\n")
pubs := []byte("PUB foo bar 5\r\nhello\r\n")
op := []byte{}
op = append(op, subs...)
for i := 0; i < num; i++ {
op = append(op, pubs...)
}
go func() {
c.parse(op)
c.conn.Close()
}()
var n1, n2, received int
for ; ; received += 1 {
l, err := cr.ReadString('\n')
if err != nil {
break
}
matches := msgPat.FindAllStringSubmatch(l,-1)[0]
// Count which sub
switch matches[SID_INDEX] {
case "1":
n1++
case "2":
n2++
}
checkPayload(cr, []byte("hello\r\n"), t)
}
if received != num {
t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, num)
}
// Threshold for randomness for now
if n1 < 2 || n2 < 2 {
t.Fatalf("Received wrong # of msgs per subscriber: %d - %d\n", n1, n2)
}
}
func TestClientUnSub(t *testing.T) {
_, c, cr := setupClient()
num := 1
// Queue SUB/PUB
subs := []byte("SUB foo 1\r\nSUB foo 2\r\n")
unsub := []byte("UNSUB 1\r\n")
pub := []byte("PUB foo bar 5\r\nhello\r\n")
op := []byte{}
op = append(op, subs...)
op = append(op, unsub...)
op = append(op, pub...)
go func() {
c.parse(op)
c.conn.Close()
}()
var received int
for ; ; received += 1 {
l, err := cr.ReadString('\n')
if err != nil {
break
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if matches[SID_INDEX] != "2" {
t.Fatalf("Received msg on unsubscribed subscription!\n")
}
checkPayload(cr, []byte("hello\r\n"), t)
}
if received != num {
t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, num)
}
}
func TestClientUnSubMax(t *testing.T) {
_, c, cr := setupClient()
num := 10
exp := 5
// Queue SUB/PUB
subs := []byte("SUB foo 1\r\n")
unsub := []byte("UNSUB 1 5\r\n")
pub := []byte("PUB foo bar 5\r\nhello\r\n")
op := []byte{}
op = append(op, subs...)
op = append(op, unsub...)
for i := 0; i < num; i++ {
op = append(op, pub...)
}
go func() {
c.parse(op)
c.conn.Close()
}()
var received int
for ; ; received += 1 {
l, err := cr.ReadString('\n')
if err != nil {
break
}
matches := msgPat.FindAllStringSubmatch(l, -1)[0]
if matches[SID_INDEX] != "1" {
t.Fatalf("Received msg on unsubscribed subscription!\n")
}
checkPayload(cr, []byte("hello\r\n"), t)
}
if received != exp {
t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, exp)
}
}
func TestClientRemoveSubsOnDisconnect(t *testing.T) {
s, c, _ := setupClient()
subs := []byte("SUB foo 1\r\nSUB bar 2\r\n")
ch := make(chan bool)
go func() {
c.parse(subs)
ch <- true
}()
<-ch
if s.sl.Count() != 2 {
t.Fatalf("Should have 2 subscriptions, got %d\n", s.sl.Count())
}
c.closeConnection()
if s.sl.Count() != 0 {
t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count())
}
}

38
server/const.go Normal file
View File

@@ -0,0 +1,38 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import (
"time"
)
const (
VERSION = "0.1.0.alpha.1"
DEFAULT_PORT = 4222
DEFAULT_HOST = "0.0.0.0"
// 1k should be plenty since payloads sans connect string are separate
MAX_CONTROL_LINE_SIZE = 1024
// Should be using something different if > 1MB payload
MAX_PAYLOAD_SIZE = (1024 * 1024)
// Maximum outbound size per client
MAX_PENDING_SIZE = (10 * 1024 * 1024)
// Maximum connections default
DEFAULT_MAX_CONNECTIONS = (64 * 1024)
// TLS/SSL wait time
SSL_TIMEOUT = 500 * time.Millisecond
// Authorization wait time
AUTH_TIMEOUT = 2 * SSL_TIMEOUT
// Ping intervals
DEFAULT_PING_INTERVAL = 2 * time.Minute
DEFAULT_PING_MAX = 2
CR_LF = "\r\n"
)

9
server/errors.go Normal file
View File

@@ -0,0 +1,9 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import "errors"
var (
ErrConnectionClosed = errors.New("Connection closed")
)

345
server/parser.go Normal file
View File

@@ -0,0 +1,345 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import (
"fmt"
// "log"
)
type pubArg struct {
subject []byte
reply []byte
szb []byte
size int
}
type parseState struct {
state int
as int
drop int
pa pubArg
argBuf []byte
msgBuf []byte
scratch [MAX_CONTROL_LINE_SIZE]byte
}
const (
OP_START = iota
OP_C
OP_CO
OP_CON
OP_CONN
OP_CONNE
OP_CONNEC
OP_CONNECT
CONNECT_ARG
OP_P
OP_PU
OP_PUB
PUB_ARG
OP_PI
OP_PIN
OP_PING
MSG_PAYLOAD
MSG_END
OP_S
OP_SU
OP_SUB
SUB_ARG
OP_U
OP_UN
OP_UNS
OP_UNSU
OP_UNSUB
UNSUB_ARG
)
func (c *client) parse(buf []byte) error {
var i int
var b byte
// log.Printf("parse: len = %d, cap = %d\n", len(buf), cap(buf))
// log.Printf("Parse bytes: '%s'\n", buf)
c.nr++
c.nb += len(buf)
for i, b = range buf {
switch c.state {
case OP_START:
switch b {
case 'C', 'c':
c.state = OP_C
case 'P', 'p':
c.state = OP_P
case 'S', 's':
c.state = OP_S
case 'U', 'u':
c.state = OP_U
default:
goto parseErr
}
case OP_P:
switch b {
case 'U', 'u':
c.state = OP_PU
case 'I', 'i':
c.state = OP_PI
default:
goto parseErr
}
case OP_PU:
switch b {
case 'B', 'b':
c.state = OP_PUB
default:
goto parseErr
}
case OP_PUB:
switch b {
case ' ', '\t':
continue
default:
c.state = PUB_ARG
c.as = i
}
case PUB_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processPub(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case MSG_PAYLOAD:
if c.msgBuf != nil {
if len(c.msgBuf) >= c.pa.size {
c.processMsg(c.msgBuf)
c.msgBuf, c.state = nil, MSG_END
} else {
c.msgBuf = append(c.msgBuf, b)
}
} else if i-c.as >= c.pa.size {
c.processMsg(buf[c.as:i])
c.state = MSG_END
}
case MSG_END:
switch b {
case '\n':
c.drop, c.as, c.state = 0, i+1, OP_START
default:
continue
}
case OP_S:
switch b {
case 'U', 'u':
c.state = OP_SU
default:
goto parseErr
}
case OP_SU:
switch b {
case 'B', 'b':
c.state = OP_SUB
default:
goto parseErr
}
case OP_SUB:
switch b {
case ' ', '\t':
continue
default:
c.state = SUB_ARG
c.as = i
}
case SUB_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processSub(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, OP_START
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_U:
switch b {
case 'N', 'n':
c.state = OP_UN
default:
goto parseErr
}
case OP_UN:
switch b {
case 'S', 's':
c.state = OP_UNS
default:
goto parseErr
}
case OP_UNS:
switch b {
case 'U', 'u':
c.state = OP_UNSU
default:
goto parseErr
}
case OP_UNSU:
switch b {
case 'B', 'b':
c.state = OP_UNSUB
default:
goto parseErr
}
case OP_UNSUB:
switch b {
case ' ', '\t':
continue
default:
c.state = UNSUB_ARG
c.as = i
}
case UNSUB_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processUnsub(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, OP_START
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_PI:
switch b {
case 'N', 'n':
c.state = OP_PIN
default:
goto parseErr
}
case OP_PIN:
switch b {
case 'G', 'g':
c.state = OP_PING
default:
goto parseErr
}
case OP_PING:
switch b {
case '\n':
c.processPing()
c.drop, c.state = 0, OP_START
}
case OP_C:
switch b {
case 'O', 'o':
c.state = OP_CO
default:
goto parseErr
}
case OP_CO:
switch b {
case 'N', 'n':
c.state = OP_CON
default:
goto parseErr
}
case OP_CON:
switch b {
case 'N', 'n':
c.state = OP_CONN
default:
goto parseErr
}
case OP_CONN:
switch b {
case 'E', 'e':
c.state = OP_CONNE
default:
goto parseErr
}
case OP_CONNE:
switch b {
case 'C', 'c':
c.state = OP_CONNEC
default:
goto parseErr
}
case OP_CONNEC:
switch b {
case 'T', 't':
c.state = OP_CONNECT
default:
goto parseErr
}
case OP_CONNECT:
switch b {
case ' ', '\t':
continue
default:
c.state = CONNECT_ARG
c.as = i
}
case CONNECT_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
if err := c.processConnect(buf[c.as : i-c.drop]); err != nil {
return err
}
c.drop, c.state = 0, OP_START
}
default:
goto parseErr
}
}
// Check for split buffer scenarios
if (c.state == SUB_ARG || c.state == PUB_ARG) && c.argBuf == nil {
c.argBuf = c.scratch[:0]
c.argBuf = append(c.argBuf, buf[c.as:(i+1)-c.drop]...)
// FIXME, check max len
}
if c.state == MSG_PAYLOAD && c.msgBuf == nil {
// FIXME: copy better here? Make whole buf if large?
c.msgBuf = c.scratch[:0]
c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
}
return nil
parseErr:
return fmt.Errorf("Parse Error [%d]: '%s'", c.state, buf[i:])
}

207
server/parser_test.go Normal file
View File

@@ -0,0 +1,207 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import (
"bytes"
"testing"
)
func dummyClient() *client {
return &client{}
}
func TestParsePing(t *testing.T) {
c := dummyClient()
if c.state != OP_START {
t.Fatalf("Expected OP_START vs %d\n", c.state)
}
ping := []byte("PING\r\n")
err := c.parse(ping[:1])
if err != nil || c.state != OP_P {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
err = c.parse(ping[1:2])
if err != nil || c.state != OP_PI {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
err = c.parse(ping[2:3])
if err != nil || c.state != OP_PIN {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
err = c.parse(ping[3:4])
if err != nil || c.state != OP_PING {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
err = c.parse(ping[4:5])
if err != nil || c.state != OP_PING {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
err = c.parse(ping[5:6])
if err != nil || c.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
err = c.parse(ping)
if err != nil || c.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
// Should tolerate spaces
ping = []byte("PING \r")
err = c.parse(ping)
if err != nil || c.state != OP_PING {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
c.state = OP_START
ping = []byte("PING \r \n")
err = c.parse(ping)
if err != nil || c.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
}
func TestParseConnect(t *testing.T) {
c := dummyClient()
connect := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n")
err := c.parse(connect)
if err != nil || c.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
// Check saved state
if c.as != 8 {
t.Fatalf("ArgStart state incorrect: 8 vs %d\n", c.as)
}
}
func TestParseSub(t *testing.T) {
c := dummyClient()
sub := []byte("SUB foo 1\r")
err := c.parse(sub)
if err != nil || c.state != SUB_ARG {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
// Check saved state
if c.as != 4 {
t.Fatalf("ArgStart state incorrect: 4 vs %d\n", c.as)
}
if c.drop != 1 {
t.Fatalf("Drop state incorrect: 1 vs %d\n", c.as)
}
if !bytes.Equal(sub[c.as:], []byte("foo 1\r")) {
t.Fatalf("Arg state incorrect: %s\n", sub[c.as:])
}
}
func TestParsePub(t *testing.T) {
c := dummyClient()
pub := []byte("PUB foo 5\r\nhello\r")
err := c.parse(pub)
if err != nil || c.state != MSG_END {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.subject, []byte("foo")) {
t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", string(c.pa.subject))
}
if c.pa.reply != nil {
t.Fatalf("Did not parse reply correctly: 'nil' vs '%s'\n", string(c.pa.reply))
}
if c.pa.size != 5 {
t.Fatalf("Did not parse msg size correctly: 5 vs %d\n", c.pa.size)
}
c.state = OP_START
pub = []byte("PUB foo.bar INBOX.22 11\r\nhello world\r")
err = c.parse(pub)
if err != nil || c.state != MSG_END {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if !bytes.Equal(c.pa.subject, []byte("foo.bar")) {
t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", string(c.pa.subject))
}
if !bytes.Equal(c.pa.reply, []byte("INBOX.22")) {
t.Fatalf("Did not parse reply correctly: 'INBOX.22' vs '%s'\n", string(c.pa.reply))
}
if c.pa.size != 11 {
t.Fatalf("Did not parse msg size correctly: 11 vs %d\n", c.pa.size)
}
}
func testPubArg(c *client, t *testing.T) {
if !bytes.Equal(c.pa.subject, []byte("foo")) {
t.Fatalf("Mismatched subject: '%s'\n", c.pa.subject)
}
if !bytes.Equal(c.pa.szb, []byte("22")) {
t.Fatalf("Bad size buf: '%s'\n", c.pa.szb)
}
if c.pa.size != 22 {
t.Fatalf("Bad size: %d\n", c.pa.size)
}
}
func TestParsePubArg(t *testing.T) {
c := dummyClient()
if err := c.processPub([]byte("foo 22")) ; err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
testPubArg(c, t)
if err := c.processPub([]byte(" foo 22")) ; err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
testPubArg(c, t)
if err := c.processPub([]byte(" foo 22 ")) ; err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
testPubArg(c, t)
if err := c.processPub([]byte("foo 22")) ; err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if err := c.processPub([]byte("foo 22\r")) ; err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
testPubArg(c, t)
}
func TestShouldFail(t *testing.T) {
c := dummyClient()
if err := c.parse([]byte(" PING")) ; err == nil {
t.Fatal("Should have received a parse error")
}
c.state = OP_START
if err := c.parse([]byte("CONNECT \r\n")) ; err == nil {
t.Fatal("Should have received a parse error")
}
c.state = OP_START
if err := c.parse([]byte("Po")) ; err == nil {
t.Fatal("Should have received a parse error")
}
c.state = OP_START
if err := c.parse([]byte("PUB foo\r\n")) ; err == nil {
t.Fatal("Should have received a parse error")
}
c.state = OP_START
if err := c.parse([]byte("PUB \r\n")) ; err == nil {
t.Fatal("Should have received a parse error")
}
c.state = OP_START
if err := c.parse([]byte("PUB foo bar \r\n")) ; err == nil {
t.Fatal("Should have received a parse error")
}
c.state = OP_START
if err := c.parse([]byte("SUB\r\n")) ; err == nil {
t.Fatal("Should have received a parse error")
}
c.state = OP_START
if err := c.parse([]byte("SUB \r\n")) ; err == nil {
t.Fatal("Should have received a parse error")
}
c.state = OP_START
if err := c.parse([]byte("SUB foo\r\n")) ; err == nil {
t.Fatal("Should have received a parse error")
}
c.state = OP_START
if err := c.parse([]byte("SUB foo bar baz 22\r\n")) ; err == nil {
t.Fatal("Should have received a parse error")
}
}

97
server/server.go Normal file
View File

@@ -0,0 +1,97 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import (
"bufio"
"encoding/json"
"fmt"
"log"
"net"
"sync/atomic"
"github.com/apcera/gnatsd/hashmap"
"github.com/apcera/gnatsd/sublist"
)
type info struct {
Id string `json:"server_id"`
Version string `json:"version"`
Host string `json:"host"`
Port uint `json:"port"`
AuthRequired bool `json:"auth_required"`
SslRequired bool `json:"ssl_required"`
MaxPayload int `json:"max_payload"`
}
type Server struct {
info info
infoJson []byte
sl *sublist.Sublist
gcid uint64
}
func New() *Server {
s := &Server{
info: info{
Id: genId(),
Version: VERSION,
Host: DEFAULT_HOST,
Port: DEFAULT_PORT,
AuthRequired: false,
SslRequired: false,
MaxPayload: MAX_PAYLOAD_SIZE,
},
sl: sublist.New(),
}
// Generate the info json
b, err := json.Marshal(s.info)
if err != nil {
log.Fatalf("Err marshalling INFO JSON: %+v\n", err)
}
s.infoJson = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF))
return s
}
func (s *Server) AcceptLoop() {
l, e := net.Listen("tcp", "0.0.0.0:4222")
if e != nil {
println(e)
return
}
log.Println("Listening on ", l.Addr())
for {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Printf("Accept error: %v", err)
}
continue
}
s.createClient(conn)
}
}
func (s *Server) createClient(conn net.Conn) *client {
c := &client{srv: s, conn: conn}
c.cid = atomic.AddUint64(&s.gcid, 1)
// log.Printf("Creating Client: %+v\n", c)
c.bw = bufio.NewWriterSize(c.conn, defaultBufSize)
c.br = bufio.NewReaderSize(c.conn, defaultBufSize)
c.subs = hashmap.New()
/*
if ipc := conn.(*net.TCPConn) ; ipc != nil {
ipc.SetReadBuffer(65536)
}
*/
s.sendInfo(c)
go c.readLoop()
return c
}
func (s *Server) sendInfo(c *client) {
// FIXME, err
c.conn.Write(s.infoJson)
}

131
server/split_test.go Normal file
View File

@@ -0,0 +1,131 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import (
"bytes"
"testing"
"github.com/apcera/gnatsd/sublist"
)
func TestSplitBufferSubOp(t *testing.T) {
s := &Server{ sl: sublist.New() }
c := &client{srv:s}
subop := []byte("SUB foo 1\r\n")
subop1 := subop[:6]
subop2 := subop[6:]
if err := c.parse(subop1) ; err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != SUB_ARG {
t.Fatalf("Expected SUB_ARG state vs %d\n", c.state)
}
if err := c.parse(subop2) ; err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.sl.Match([]byte("foo"))
if r == nil || len(r) != 1 {
t.Fatalf("Did not match subscription properly: %+v\n", r)
}
sub := r[0].(*subscription)
if !bytes.Equal(sub.subject, []byte("foo")) {
t.Fatalf("Subject did not match expected 'foo' : '%s'\n", sub.subject)
}
if !bytes.Equal(sub.sid, []byte("1")) {
t.Fatalf("Sid did not match expected '1' : '%s'\n", sub.sid)
}
if sub.queue != nil {
t.Fatalf("Received a non-nil queue: '%s'\n", sub.queue)
}
}
func TestSplitBufferPubOp(t *testing.T) {
c := &client{}
pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r")
pub1 := pub[:2]
pub2 := pub[2:9]
pub3 := pub[9:15]
pub4 := pub[15:22]
pub5 := pub[22:25]
pub6 := pub[25:33]
pub7 := pub[33:]
if err := c.parse(pub1); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != OP_PU {
t.Fatalf("Expected OP_PU state vs %d\n", c.state)
}
if err := c.parse(pub2); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != PUB_ARG {
t.Fatalf("Expected OP_PU state vs %d\n", c.state)
}
if err := c.parse(pub3); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != PUB_ARG {
t.Fatalf("Expected OP_PU state vs %d\n", c.state)
}
if err := c.parse(pub4); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != PUB_ARG {
t.Fatalf("Expected PUB_ARG state vs %d\n", c.state)
}
if err := c.parse(pub5); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != MSG_PAYLOAD {
t.Fatalf("Expected MSG_PAYLOAD state vs %d\n", c.state)
}
// Check c.pa
if !bytes.Equal(c.pa.subject, []byte("foo.bar")) {
t.Fatalf("PUB arg subject incorrect: '%s'\n", c.pa.subject)
}
if !bytes.Equal(c.pa.reply, []byte("INBOX.22")) {
t.Fatalf("PUB arg reply subject incorrect: '%s'\n", c.pa.reply)
}
if c.pa.size != 11 {
t.Fatalf("PUB arg msg size incorrect: %d\n", c.pa.size)
}
if err := c.parse(pub6); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != MSG_PAYLOAD {
t.Fatalf("Expected MSG_PAYLOAD state vs %d\n", c.state)
}
if err := c.parse(pub7); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != MSG_END {
t.Fatalf("Expected MSG_END state vs %d\n", c.state)
}
}
func TestSplitBufferPubOp2(t *testing.T) {
c := &client{}
pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r\n")
pub1 := pub[:30]
pub2 := pub[30:]
if err := c.parse(pub1); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != MSG_PAYLOAD {
t.Fatalf("Expected MSG_PAYLOAD state vs %d\n", c.state)
}
if err := c.parse(pub2); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
}

36
server/util.go Normal file
View File

@@ -0,0 +1,36 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import (
"crypto/rand"
"encoding/hex"
"io"
)
func genId() string {
u := make([]byte, 16)
io.ReadFull(rand.Reader, u)
return hex.EncodeToString(u)
}
// Ascii numbers 0-9
const (
ascii_0 = 48
ascii_9 = 57
)
// parseSize expects decimal positive numbers. We
// return -1 to signal error
func parseSize(d []byte) (n int) {
if len(d) == 0 {
return -1
}
for _, dec := range d {
if dec < ascii_0 || dec > ascii_9 {
return -1
}
n = n*10 + (int(dec) - ascii_0)
}
return n
}