// Copyright 2012 Apcera Inc. All rights reserved. package server import ( "bufio" "bytes" "encoding/json" "net" "reflect" "regexp" "strings" "testing" ) type serverInfo struct { Id string `json:"server_id"` Host string `json:"host"` Port uint `json:"port"` Version string `json:"version"` AuthRequired bool `json:"auth_required"` SslRequired bool `json:"ssl_required"` MaxPayload int64 `json:"max_payload"` } func createClientAsync(ch chan *client, s *Server, cli net.Conn) { go func() { c := s.createClient(cli) // Must be here to suppress +OK c.opts.Verbose = false ch <- c }() } func rawSetup() (*Server, *client, *bufio.Reader, string) { cli, srv := net.Pipe() cr := bufio.NewReaderSize(cli, defaultBufSize) s := New(Options{}) ch := make(chan *client) createClientAsync(ch, s, srv) l, _ := cr.ReadString('\n') // Grab client c := <-ch return s, c, cr, l } func setUpClientWithResponse() (*client, string) { _, c, _, l := rawSetup() return c, l } func setupClient() (*Server, *client, *bufio.Reader) { s, c, cr, _ := rawSetup() return s, c, cr } func TestClientCreateAndInfo(t *testing.T) { c, l := setUpClientWithResponse() if c.cid != 1 { t.Fatalf("Expected cid of 1 vs %d\n", c.cid) } if c.state != OP_START { t.Fatal("Expected state to be OP_START") } if !strings.HasPrefix(l, "INFO ") { t.Fatalf("INFO response incorrect: %s\n", l) } // Make sure payload is proper json var info serverInfo err := json.Unmarshal([]byte(l[5:]), &info) if err != nil { t.Fatalf("Could not parse INFO json: %v\n", err) } // Sanity checks if info.MaxPayload != MAX_PAYLOAD_SIZE || info.AuthRequired || info.SslRequired || info.Port != DEFAULT_PORT { t.Fatalf("INFO inconsistent: %+v\n", info) } } func TestClientConnect(t *testing.T) { _, c, _ := setupClient() // Basic Connect setting flags connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n") err := c.parse(connectOp) if err != nil { t.Fatalf("Received error: %v\n", err) } if c.state != OP_START { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } // Test that we can capture user/pass connectOp = []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\"}\r\n") c.opts = defaultOpts err = c.parse(connectOp) if err != nil { t.Fatalf("Received error: %v\n", err) } if c.state != OP_START { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true, Username:"derek", Password:"foo"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } // Test that we can capture client name connectOp = []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\",\"name\":\"router\"}\r\n") c.opts = defaultOpts err = c.parse(connectOp) if err != nil { t.Fatalf("Received error: %v\n", err) } if c.state != OP_START { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true, Username:"derek", Password:"foo", Name:"router"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } // Test that we correctly capture auth tokens connectOp = []byte("CONNECT {\"auth_token\":\"YZZ222\",\"name\":\"router\"}\r\n") c.opts = defaultOpts err = c.parse(connectOp) if err != nil { t.Fatalf("Received error: %v\n", err) } if c.state != OP_START { t.Fatalf("Expected state of OP_START vs %d\n", c.state) } if !reflect.DeepEqual(c.opts, clientOpts{Verbose:true, Pedantic:true, Authorization:"YZZ222", Name:"router"}) { t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts) } } func TestClientPing(t *testing.T) { _, c, cr := setupClient() // PING pingOp := []byte("PING\r\n") go c.parse(pingOp) l, err := cr.ReadString('\n') if err != nil { t.Fatalf("Error receiving info from server: %v\n", err) } if !strings.HasPrefix(l, "PONG\r\n") { t.Fatalf("PONG response incorrect: %s\n", l) } } var msgPat = regexp.MustCompile(`\AMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n`) const ( SUB_INDEX = 1 SID_INDEX = 2 REPLY_INDEX = 4 LEN_INDEX = 5 ) func checkPayload(cr *bufio.Reader, expected []byte, t *testing.T) { // Read in payload d := make([]byte, len(expected)) n, err := cr.Read(d) if err != nil { t.Fatalf("Error receiving msg payload from server: %v\n", err) } if n != len(expected) { t.Fatalf("Did not read correct amount of bytes: %d vs %d\n", n, len(expected)) } if !bytes.Equal(d, expected) { t.Fatalf("Did not read correct payload:: <%s>\n", d) } } func TestClientSimplePubSub(t *testing.T) { _, c, cr := setupClient() // SUB/PUB go c.parse([]byte("SUB foo 1\r\nPUB foo 5\r\nhello\r\nPING\r\n")) l, err := cr.ReadString('\n') if err != nil { t.Fatalf("Error receiving msg from server: %v\n", err) } matches := msgPat.FindAllStringSubmatch(l, -1)[0] if len(matches) != 6 { t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6) } if matches[SUB_INDEX] != "foo" { t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) } if matches[LEN_INDEX] != "5" { t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX]) } checkPayload(cr, []byte("hello\r\n"), t) } func TestClientSimplePubSubWithReply(t *testing.T) { _, c, cr := setupClient() // SUB/PUB go c.parse([]byte("SUB foo 1\r\nPUB foo bar 5\r\nhello\r\nPING\r\n")) l, err := cr.ReadString('\n') if err != nil { t.Fatalf("Error receiving msg from server: %v\n", err) } matches := msgPat.FindAllStringSubmatch(l, -1)[0] if len(matches) != 6 { t.Fatalf("Did not get correct # matches: %d vs %d\n", len(matches), 6) } if matches[SUB_INDEX] != "foo" { t.Fatalf("Did not get correct subject: '%s'\n", matches[SUB_INDEX]) } if matches[SID_INDEX] != "1" { t.Fatalf("Did not get correct sid: '%s'\n", matches[SID_INDEX]) } if matches[REPLY_INDEX] != "bar" { t.Fatalf("Did not get correct reply subject: '%s'\n", matches[REPLY_INDEX]) } if matches[LEN_INDEX] != "5" { t.Fatalf("Did not get correct msg length: '%s'\n", matches[LEN_INDEX]) } } func TestClientPubWithQueueSub(t *testing.T) { _, c, cr := setupClient() num := 10 // Queue SUB/PUB subs := []byte("SUB foo g1 1\r\nSUB foo g1 2\r\n") pubs := []byte("PUB foo bar 5\r\nhello\r\n") op := []byte{} op = append(op, subs...) for i := 0; i < num; i++ { op = append(op, pubs...) } go func() { c.parse(op) c.conn.Close() }() var n1, n2, received int for ; ; received += 1 { l, err := cr.ReadString('\n') if err != nil { break } matches := msgPat.FindAllStringSubmatch(l,-1)[0] // Count which sub switch matches[SID_INDEX] { case "1": n1++ case "2": n2++ } checkPayload(cr, []byte("hello\r\n"), t) } if received != num { t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, num) } // Threshold for randomness for now if n1 < 2 || n2 < 2 { t.Fatalf("Received wrong # of msgs per subscriber: %d - %d\n", n1, n2) } } func TestClientUnSub(t *testing.T) { _, c, cr := setupClient() num := 1 // SUB/PUB subs := []byte("SUB foo 1\r\nSUB foo 2\r\n") unsub := []byte("UNSUB 1\r\n") pub := []byte("PUB foo bar 5\r\nhello\r\n") op := []byte{} op = append(op, subs...) op = append(op, unsub...) op = append(op, pub...) go func() { c.parse(op) c.conn.Close() }() var received int for ; ; received += 1 { l, err := cr.ReadString('\n') if err != nil { break } matches := msgPat.FindAllStringSubmatch(l, -1)[0] if matches[SID_INDEX] != "2" { t.Fatalf("Received msg on unsubscribed subscription!\n") } checkPayload(cr, []byte("hello\r\n"), t) } if received != num { t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, num) } } func TestClientUnSubMax(t *testing.T) { _, c, cr := setupClient() num := 10 exp := 5 // SUB/PUB subs := []byte("SUB foo 1\r\n") unsub := []byte("UNSUB 1 5\r\n") pub := []byte("PUB foo bar 5\r\nhello\r\n") op := []byte{} op = append(op, subs...) op = append(op, unsub...) for i := 0; i < num; i++ { op = append(op, pub...) } go func() { c.parse(op) c.conn.Close() }() var received int for ; ; received += 1 { l, err := cr.ReadString('\n') if err != nil { break } matches := msgPat.FindAllStringSubmatch(l, -1)[0] if matches[SID_INDEX] != "1" { t.Fatalf("Received msg on unsubscribed subscription!\n") } checkPayload(cr, []byte("hello\r\n"), t) } if received != exp { t.Fatalf("Received wrong # of msgs: %d vs %d\n", received, exp) } } func TestClientRemoveSubsOnDisconnect(t *testing.T) { s, c, _ := setupClient() subs := []byte("SUB foo 1\r\nSUB bar 2\r\n") ch := make(chan bool) go func() { c.parse(subs) ch <- true }() <-ch if s.sl.Count() != 2 { t.Fatalf("Should have 2 subscriptions, got %d\n", s.sl.Count()) } c.closeConnection() if s.sl.Count() != 0 { t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count()) } }