Enable verbose mode

This commit is contained in:
Derek Collison
2012-11-29 16:52:10 -08:00
parent 15e412a4bd
commit 56cfcac374
7 changed files with 117 additions and 30 deletions

View File

@@ -56,13 +56,14 @@ type clientOpts struct {
SslRequired bool `json:"ssl_required"`
}
var defaultOpts = clientOpts{true, true, false}
func init() {
rand.Seed(time.Now().UnixNano())
}
func (c *client) readLoop() {
b := make([]byte, defaultBufSize)
// log.Printf("b len = %d, cap = %d\n", len(b), cap(b))
for {
n, err := c.conn.Read(b)
if err != nil {
@@ -109,10 +110,19 @@ func (c *client) traceOp(op string, arg []byte) {
func (c *client) processConnect(arg []byte) error {
c.traceOp("CONNECT", arg)
// FIXME, check err
return json.Unmarshal(arg, &c.opts)
err := json.Unmarshal(arg, &c.opts)
if c.opts.Verbose {
c.sendOK()
}
return err
}
var pongResp = []byte(fmt.Sprintf("PONG%s", CR_LF))
func (c *client) sendOK() {
c.mu.Lock()
c.bw.WriteString("+OK\r\n")
c.pcd[c] = needFlush
c.mu.Unlock()
}
func (c *client) processPing() {
c.traceOp("PING", nil)
@@ -120,7 +130,7 @@ func (c *client) processPing() {
return
}
c.mu.Lock()
c.bw.Write(pongResp)
c.bw.WriteString("PONG\r\n")
err := c.bw.Flush()
c.mu.Unlock()
if err != nil {
@@ -221,12 +231,14 @@ func (c *client) processSub(argo []byte) error {
}
c.mu.Lock()
defer c.mu.Unlock()
c.subs.Set(sub.sid, sub)
if c.srv != nil {
c.srv.sl.Insert(sub.subject, sub)
}
c.mu.Unlock()
if c.opts.Verbose {
c.sendOK()
}
return nil
}
@@ -264,6 +276,9 @@ func (c *client) processUnsub(arg []byte) error {
}
c.unsubscribe(sub)
}
if c.opts.Verbose {
c.sendOK()
}
return nil
}
@@ -281,6 +296,7 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte {
// Used to treat map as efficient set
type empty struct{}
var needFlush = empty{}
func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
@@ -321,6 +337,9 @@ func (c *client) processMsg(msg []byte) {
if c.srv == nil {
return
}
if c.opts.Verbose {
c.sendOK()
}
scratch := [512]byte{}
msgh := scratch[:0]

View File

@@ -26,6 +26,8 @@ type serverInfo struct {
func createClientAsync(ch chan *client, s *Server, cli net.Conn) {
go func() {
c := s.createClient(cli)
// Must be here to suppress +OK
c.opts.Verbose = false
ch <- c
}()
}
@@ -84,7 +86,7 @@ func TestClientCreateAndInfo(t *testing.T) {
func TestClientConnect(t *testing.T) {
_, c, _ := setupClient()
// Basic Connect
// Basic Connect setting flags
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n")
err := c.parse(connectOp)
if err != nil {

View File

@@ -61,7 +61,7 @@ func TestParsePing(t *testing.T) {
func TestParseConnect(t *testing.T) {
c := dummyClient()
connect := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n")
connect := []byte("CONNECT {\"verbose\":false,\"pedantic\":true,\"ssl_required\":false}\r\n")
err := c.parse(connect)
if err != nil || c.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)

View File

@@ -106,7 +106,7 @@ func (s *Server) AcceptLoop() {
}
func clientConnStr(conn net.Conn) interface{} {
if ip, ok := conn.(*net.TCPConn); ok {
if ip, ok := conn.(*net.TCPConn); ok {
addr := ip.RemoteAddr().(*net.TCPAddr)
return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)}
}
@@ -114,7 +114,7 @@ func clientConnStr(conn net.Conn) interface{} {
}
func (s *Server) createClient(conn net.Conn) *client {
c := &client{srv: s, conn: conn}
c := &client{srv: s, conn: conn, opts: defaultOpts}
c.cid = atomic.AddUint64(&s.gcid, 1)
c.bw = bufio.NewWriterSize(c.conn, defaultBufSize)
c.subs = hashmap.New()
@@ -123,15 +123,14 @@ func (s *Server) createClient(conn net.Conn) *client {
// after we process inbound msgs from our own connection.
c.pcd = make(map[*client]struct{})
if ip, ok := conn.(*net.TCPConn); ok {
ip.SetReadBuffer(32768)
}
Debug("Client connection created", clientConnStr(conn), c.cid)
if ip, ok := conn.(*net.TCPConn); ok {
ip.SetReadBuffer(32768)
}
s.sendInfo(c)
go c.readLoop()
Debug("Client connection created", clientConnStr(conn), c.cid)
return c
}

View File

@@ -4,6 +4,7 @@ package test
import (
"encoding/json"
"fmt"
"net"
"regexp"
"testing"
@@ -14,8 +15,10 @@ import (
var s *natsServer
const PROTO_TEST_PORT = 9922
func TestStartup(t *testing.T) {
s = startServer(t, server.DEFAULT_PORT, "")
s = startServer(t, PROTO_TEST_PORT, "")
}
type sendFun func(string)
@@ -30,7 +33,7 @@ func sendCommand(t tLogger, c net.Conn) sendFun {
// Closure version for easier reading
func expectCommand(t tLogger, c net.Conn) expectFun {
return func(re *regexp.Regexp)([]byte) {
return func(re *regexp.Regexp) []byte {
return expectResult(t, c, re)
}
}
@@ -98,9 +101,12 @@ func expectMsgsCommand(t tLogger, ef expectFun) func(int) [][][]byte {
}
}
var infoRe = regexp.MustCompile(`\AINFO\s+([^\r\n]+)\r\n`)
var pongRe = regexp.MustCompile(`\APONG\r\n`)
var msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n([^\\r\\n]*?)\r\n)+?)`)
var (
infoRe = regexp.MustCompile(`\AINFO\s+([^\r\n]+)\r\n`)
pongRe = regexp.MustCompile(`\APONG\r\n`)
msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n([^\\r\\n]*?)\r\n)+?)`)
okRe = regexp.MustCompile(`\A\+OK\r\n`)
)
const (
SUB_INDEX = 1
@@ -110,9 +116,9 @@ const (
MSG_INDEX = 6
)
func doDefaultConnect(t tLogger, c net.Conn) {
// Basic Connect
sendProto(t, c, "CONNECT {\"verbose\":false,\"pedantic\":false,\"ssl_required\":false}\r\n")
func doConnect(t tLogger, c net.Conn, verbose, pedantic, ssl bool) {
cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"ssl_required\":%v}\r\n", verbose, pedantic, ssl)
sendProto(t, c, cs)
buf := expectResult(t, c, infoRe)
js := infoRe.FindAllSubmatch(buf, 1)[0][1]
var sinfo server.Info
@@ -122,6 +128,11 @@ func doDefaultConnect(t tLogger, c net.Conn) {
}
}
func doDefaultConnect(t tLogger, c net.Conn) {
// Basic Connect
doConnect(t, c, false, false, false)
}
func setupConn(t tLogger, c net.Conn) (sendFun, expectFun) {
doDefaultConnect(t, c)
send := sendCommand(t, c)
@@ -130,7 +141,7 @@ func setupConn(t tLogger, c net.Conn) (sendFun, expectFun) {
}
func TestProtoBasics(t *testing.T) {
c := createClientConn(t, "localhost", server.DEFAULT_PORT)
c := createClientConn(t, "localhost", PROTO_TEST_PORT)
send, expect := setupConn(t, c)
expectMsgs := expectMsgsCommand(t, expect)
defer c.Close()
@@ -152,7 +163,7 @@ func TestProtoBasics(t *testing.T) {
}
func TestUnsubMax(t *testing.T) {
c := createClientConn(t, "localhost", server.DEFAULT_PORT)
c := createClientConn(t, "localhost", PROTO_TEST_PORT)
send, expect := setupConn(t, c)
expectMsgs := expectMsgsCommand(t, expect)
defer c.Close()
@@ -168,7 +179,7 @@ func TestUnsubMax(t *testing.T) {
}
func TestQueueSub(t *testing.T) {
c := createClientConn(t, "localhost", server.DEFAULT_PORT)
c := createClientConn(t, "localhost", PROTO_TEST_PORT)
send, expect := setupConn(t, c)
expectMsgs := expectMsgsCommand(t, expect)
defer c.Close()
@@ -195,7 +206,7 @@ func TestQueueSub(t *testing.T) {
}
func TestMultipleQueueSub(t *testing.T) {
c := createClientConn(t, "localhost", server.DEFAULT_PORT)
c := createClientConn(t, "localhost", PROTO_TEST_PORT)
send, expect := setupConn(t, c)
expectMsgs := expectMsgsCommand(t, expect)
defer c.Close()
@@ -209,7 +220,7 @@ func TestMultipleQueueSub(t *testing.T) {
for i := 0; i < sent; i++ {
send("PUB foo 2\r\nok\r\n")
}
matches := expectMsgs(sent*2)
matches := expectMsgs(sent * 2)
sids := make(map[string]int)
for _, m := range matches {
sids[string(m[SID_INDEX])]++

View File

@@ -23,7 +23,7 @@ type tLogger interface {
Errorf(format string, args ...interface{})
}
func startServer(t tLogger, port uint, other string) *natsServer {
func startServer(t tLogger, port int, other string) *natsServer {
var s natsServer
args := fmt.Sprintf("-p %d %s", port, other)
s.args = strings.Split(args, " ")

56
test/verbose_test.go Normal file
View File

@@ -0,0 +1,56 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package test
import (
"testing"
)
func TestStartupVerbose(t *testing.T) {
s = startServer(t, PROTO_TEST_PORT, "")
}
func TestVerbosePing(t *testing.T) {
c := createClientConn(t, "localhost", PROTO_TEST_PORT)
doConnect(t, c, true, false, false)
send := sendCommand(t, c)
expect := expectCommand(t, c)
// Ping should still be same
send("PING\r\n")
expect(pongRe)
}
func TestVerboseConnect(t *testing.T) {
c := createClientConn(t, "localhost", PROTO_TEST_PORT)
doConnect(t, c, true, false, false)
send := sendCommand(t, c)
expect := expectCommand(t, c)
// Connect
send("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n")
expect(okRe)
}
func TestVerbosePubSub(t *testing.T) {
c := createClientConn(t, "localhost", PROTO_TEST_PORT)
doConnect(t, c, true, false, false)
send := sendCommand(t, c)
expect := expectCommand(t, c)
// Pub
send("PUB foo 2\r\nok\r\n")
expect(okRe)
// Sub
send("SUB foo 1\r\n")
expect(okRe)
// UnSub
send("UNSUB 1\r\n")
expect(okRe)
}
func TestStopServerVerbose(t *testing.T) {
s.stopServer()
}