diff --git a/server/client.go b/server/client.go index 87c09476..e9a617fe 100644 --- a/server/client.go +++ b/server/client.go @@ -56,13 +56,14 @@ type clientOpts struct { SslRequired bool `json:"ssl_required"` } +var defaultOpts = clientOpts{true, true, false} + func init() { rand.Seed(time.Now().UnixNano()) } func (c *client) readLoop() { b := make([]byte, defaultBufSize) - // log.Printf("b len = %d, cap = %d\n", len(b), cap(b)) for { n, err := c.conn.Read(b) if err != nil { @@ -109,10 +110,19 @@ func (c *client) traceOp(op string, arg []byte) { func (c *client) processConnect(arg []byte) error { c.traceOp("CONNECT", arg) // FIXME, check err - return json.Unmarshal(arg, &c.opts) + err := json.Unmarshal(arg, &c.opts) + if c.opts.Verbose { + c.sendOK() + } + return err } -var pongResp = []byte(fmt.Sprintf("PONG%s", CR_LF)) +func (c *client) sendOK() { + c.mu.Lock() + c.bw.WriteString("+OK\r\n") + c.pcd[c] = needFlush + c.mu.Unlock() +} func (c *client) processPing() { c.traceOp("PING", nil) @@ -120,7 +130,7 @@ func (c *client) processPing() { return } c.mu.Lock() - c.bw.Write(pongResp) + c.bw.WriteString("PONG\r\n") err := c.bw.Flush() c.mu.Unlock() if err != nil { @@ -221,12 +231,14 @@ func (c *client) processSub(argo []byte) error { } c.mu.Lock() - defer c.mu.Unlock() - c.subs.Set(sub.sid, sub) if c.srv != nil { c.srv.sl.Insert(sub.subject, sub) } + c.mu.Unlock() + if c.opts.Verbose { + c.sendOK() + } return nil } @@ -264,6 +276,9 @@ func (c *client) processUnsub(arg []byte) error { } c.unsubscribe(sub) } + if c.opts.Verbose { + c.sendOK() + } return nil } @@ -281,6 +296,7 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte { // Used to treat map as efficient set type empty struct{} + var needFlush = empty{} func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { @@ -321,6 +337,9 @@ func (c *client) processMsg(msg []byte) { if c.srv == nil { return } + if c.opts.Verbose { + c.sendOK() + } scratch := [512]byte{} msgh := scratch[:0] diff --git a/server/client_test.go b/server/client_test.go index ad196933..d0daa3c6 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -26,6 +26,8 @@ type serverInfo struct { func createClientAsync(ch chan *client, s *Server, cli net.Conn) { go func() { c := s.createClient(cli) + // Must be here to suppress +OK + c.opts.Verbose = false ch <- c }() } @@ -84,7 +86,7 @@ func TestClientCreateAndInfo(t *testing.T) { func TestClientConnect(t *testing.T) { _, c, _ := setupClient() - // Basic Connect + // Basic Connect setting flags connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n") err := c.parse(connectOp) if err != nil { diff --git a/server/parser_test.go b/server/parser_test.go index e3c15926..8b653707 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -61,7 +61,7 @@ func TestParsePing(t *testing.T) { func TestParseConnect(t *testing.T) { c := dummyClient() - connect := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n") + connect := []byte("CONNECT {\"verbose\":false,\"pedantic\":true,\"ssl_required\":false}\r\n") err := c.parse(connect) if err != nil || c.state != OP_START { t.Fatalf("Unexpected: %d : %v\n", c.state, err) diff --git a/server/server.go b/server/server.go index a2f6a8c1..4697c3aa 100644 --- a/server/server.go +++ b/server/server.go @@ -106,7 +106,7 @@ func (s *Server) AcceptLoop() { } func clientConnStr(conn net.Conn) interface{} { - if ip, ok := conn.(*net.TCPConn); ok { + if ip, ok := conn.(*net.TCPConn); ok { addr := ip.RemoteAddr().(*net.TCPAddr) return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)} } @@ -114,7 +114,7 @@ func clientConnStr(conn net.Conn) interface{} { } func (s *Server) createClient(conn net.Conn) *client { - c := &client{srv: s, conn: conn} + c := &client{srv: s, conn: conn, opts: defaultOpts} c.cid = atomic.AddUint64(&s.gcid, 1) c.bw = bufio.NewWriterSize(c.conn, defaultBufSize) c.subs = hashmap.New() @@ -123,15 +123,14 @@ func (s *Server) createClient(conn net.Conn) *client { // after we process inbound msgs from our own connection. c.pcd = make(map[*client]struct{}) - if ip, ok := conn.(*net.TCPConn); ok { - ip.SetReadBuffer(32768) - } + Debug("Client connection created", clientConnStr(conn), c.cid) + + if ip, ok := conn.(*net.TCPConn); ok { + ip.SetReadBuffer(32768) + } s.sendInfo(c) go c.readLoop() - - Debug("Client connection created", clientConnStr(conn), c.cid) - return c } diff --git a/test/proto_test.go b/test/proto_test.go index 8fc49c1e..af1e72e7 100644 --- a/test/proto_test.go +++ b/test/proto_test.go @@ -4,6 +4,7 @@ package test import ( "encoding/json" + "fmt" "net" "regexp" "testing" @@ -14,8 +15,10 @@ import ( var s *natsServer +const PROTO_TEST_PORT = 9922 + func TestStartup(t *testing.T) { - s = startServer(t, server.DEFAULT_PORT, "") + s = startServer(t, PROTO_TEST_PORT, "") } type sendFun func(string) @@ -30,7 +33,7 @@ func sendCommand(t tLogger, c net.Conn) sendFun { // Closure version for easier reading func expectCommand(t tLogger, c net.Conn) expectFun { - return func(re *regexp.Regexp)([]byte) { + return func(re *regexp.Regexp) []byte { return expectResult(t, c, re) } } @@ -98,9 +101,12 @@ func expectMsgsCommand(t tLogger, ef expectFun) func(int) [][][]byte { } } -var infoRe = regexp.MustCompile(`\AINFO\s+([^\r\n]+)\r\n`) -var pongRe = regexp.MustCompile(`\APONG\r\n`) -var msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n([^\\r\\n]*?)\r\n)+?)`) +var ( + infoRe = regexp.MustCompile(`\AINFO\s+([^\r\n]+)\r\n`) + pongRe = regexp.MustCompile(`\APONG\r\n`) + msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n([^\\r\\n]*?)\r\n)+?)`) + okRe = regexp.MustCompile(`\A\+OK\r\n`) +) const ( SUB_INDEX = 1 @@ -110,9 +116,9 @@ const ( MSG_INDEX = 6 ) -func doDefaultConnect(t tLogger, c net.Conn) { - // Basic Connect - sendProto(t, c, "CONNECT {\"verbose\":false,\"pedantic\":false,\"ssl_required\":false}\r\n") +func doConnect(t tLogger, c net.Conn, verbose, pedantic, ssl bool) { + cs := fmt.Sprintf("CONNECT {\"verbose\":%v,\"pedantic\":%v,\"ssl_required\":%v}\r\n", verbose, pedantic, ssl) + sendProto(t, c, cs) buf := expectResult(t, c, infoRe) js := infoRe.FindAllSubmatch(buf, 1)[0][1] var sinfo server.Info @@ -122,6 +128,11 @@ func doDefaultConnect(t tLogger, c net.Conn) { } } +func doDefaultConnect(t tLogger, c net.Conn) { + // Basic Connect + doConnect(t, c, false, false, false) +} + func setupConn(t tLogger, c net.Conn) (sendFun, expectFun) { doDefaultConnect(t, c) send := sendCommand(t, c) @@ -130,7 +141,7 @@ func setupConn(t tLogger, c net.Conn) (sendFun, expectFun) { } func TestProtoBasics(t *testing.T) { - c := createClientConn(t, "localhost", server.DEFAULT_PORT) + c := createClientConn(t, "localhost", PROTO_TEST_PORT) send, expect := setupConn(t, c) expectMsgs := expectMsgsCommand(t, expect) defer c.Close() @@ -152,7 +163,7 @@ func TestProtoBasics(t *testing.T) { } func TestUnsubMax(t *testing.T) { - c := createClientConn(t, "localhost", server.DEFAULT_PORT) + c := createClientConn(t, "localhost", PROTO_TEST_PORT) send, expect := setupConn(t, c) expectMsgs := expectMsgsCommand(t, expect) defer c.Close() @@ -168,7 +179,7 @@ func TestUnsubMax(t *testing.T) { } func TestQueueSub(t *testing.T) { - c := createClientConn(t, "localhost", server.DEFAULT_PORT) + c := createClientConn(t, "localhost", PROTO_TEST_PORT) send, expect := setupConn(t, c) expectMsgs := expectMsgsCommand(t, expect) defer c.Close() @@ -195,7 +206,7 @@ func TestQueueSub(t *testing.T) { } func TestMultipleQueueSub(t *testing.T) { - c := createClientConn(t, "localhost", server.DEFAULT_PORT) + c := createClientConn(t, "localhost", PROTO_TEST_PORT) send, expect := setupConn(t, c) expectMsgs := expectMsgsCommand(t, expect) defer c.Close() @@ -209,7 +220,7 @@ func TestMultipleQueueSub(t *testing.T) { for i := 0; i < sent; i++ { send("PUB foo 2\r\nok\r\n") } - matches := expectMsgs(sent*2) + matches := expectMsgs(sent * 2) sids := make(map[string]int) for _, m := range matches { sids[string(m[SID_INDEX])]++ diff --git a/test/test.go b/test/test.go index ec1709c9..9e566aac 100644 --- a/test/test.go +++ b/test/test.go @@ -23,7 +23,7 @@ type tLogger interface { Errorf(format string, args ...interface{}) } -func startServer(t tLogger, port uint, other string) *natsServer { +func startServer(t tLogger, port int, other string) *natsServer { var s natsServer args := fmt.Sprintf("-p %d %s", port, other) s.args = strings.Split(args, " ") diff --git a/test/verbose_test.go b/test/verbose_test.go new file mode 100644 index 00000000..1c199433 --- /dev/null +++ b/test/verbose_test.go @@ -0,0 +1,56 @@ +// Copyright 2012 Apcera Inc. All rights reserved. + +package test + +import ( + "testing" +) + +func TestStartupVerbose(t *testing.T) { + s = startServer(t, PROTO_TEST_PORT, "") +} + +func TestVerbosePing(t *testing.T) { + c := createClientConn(t, "localhost", PROTO_TEST_PORT) + doConnect(t, c, true, false, false) + send := sendCommand(t, c) + expect := expectCommand(t, c) + + // Ping should still be same + send("PING\r\n") + expect(pongRe) +} + +func TestVerboseConnect(t *testing.T) { + c := createClientConn(t, "localhost", PROTO_TEST_PORT) + doConnect(t, c, true, false, false) + send := sendCommand(t, c) + expect := expectCommand(t, c) + + // Connect + send("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n") + expect(okRe) +} + +func TestVerbosePubSub(t *testing.T) { + c := createClientConn(t, "localhost", PROTO_TEST_PORT) + doConnect(t, c, true, false, false) + send := sendCommand(t, c) + expect := expectCommand(t, c) + + // Pub + send("PUB foo 2\r\nok\r\n") + expect(okRe) + + // Sub + send("SUB foo 1\r\n") + expect(okRe) + + // UnSub + send("UNSUB 1\r\n") + expect(okRe) +} + +func TestStopServerVerbose(t *testing.T) { + s.stopServer() +}