From b85fd41d18ca939e971998c27f704a6ac5eb50fd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 29 Nov 2012 18:42:20 -0800 Subject: [PATCH] Added suport for pedantic mode and -ERR proto --- server/client.go | 19 ++++++++-- server/server.go | 2 +- test/pedantic_test.go | 81 +++++++++++++++++++++++++++++++++++++++++++ test/proto_test.go | 1 + 4 files changed, 99 insertions(+), 4 deletions(-) create mode 100644 test/pedantic_test.go diff --git a/server/client.go b/server/client.go index e9a617fe..6b2c6e0b 100644 --- a/server/client.go +++ b/server/client.go @@ -12,6 +12,7 @@ import ( "time" "github.com/apcera/gnatsd/hashmap" + "github.com/apcera/gnatsd/sublist" ) // The size of the bufio reader/writer on top of the socket. @@ -117,6 +118,13 @@ func (c *client) processConnect(arg []byte) error { return err } +func (c *client) sendErr(err string) { + c.mu.Lock() + c.bw.WriteString(fmt.Sprintf("-ERR '%s'\r\n", err)) + c.pcd[c] = needFlush + c.mu.Unlock() +} + func (c *client) sendOK() { c.mu.Lock() c.bw.WriteString("+OK\r\n") @@ -184,6 +192,9 @@ func (c *client) processPub(arg []byte) error { if c.pa.size < 0 { return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg) } + if c.opts.Pedantic && !sublist.IsValidLiteralSubject(c.pa.subject) { + c.sendErr("Invalid Subject") + } return nil } @@ -210,7 +221,7 @@ func splitArg(arg []byte) [][]byte { return args } -func (c *client) processSub(argo []byte) error { +func (c *client) processSub(argo []byte) (err error) { c.traceOp("SUB", argo) // Copy so we do not reference a potentially large buffer arg := make([]byte, len(argo)) @@ -233,10 +244,12 @@ func (c *client) processSub(argo []byte) error { c.mu.Lock() c.subs.Set(sub.sid, sub) if c.srv != nil { - c.srv.sl.Insert(sub.subject, sub) + err = c.srv.sl.Insert(sub.subject, sub) } c.mu.Unlock() - if c.opts.Verbose { + if err != nil{ + c.sendErr("Invalid Subject") + } else if c.opts.Verbose { c.sendOK() } return nil diff --git a/server/server.go b/server/server.go index 4697c3aa..b2e9ec78 100644 --- a/server/server.go +++ b/server/server.go @@ -126,7 +126,7 @@ func (s *Server) createClient(conn net.Conn) *client { Debug("Client connection created", clientConnStr(conn), c.cid) if ip, ok := conn.(*net.TCPConn); ok { - ip.SetReadBuffer(32768) + ip.SetReadBuffer(defaultBufSize) } s.sendInfo(c) diff --git a/test/pedantic_test.go b/test/pedantic_test.go new file mode 100644 index 00000000..775a4072 --- /dev/null +++ b/test/pedantic_test.go @@ -0,0 +1,81 @@ +// Copyright 2012 Apcera Inc. All rights reserved. + +package test + +import ( + "testing" +) + +func TestStartupPedantic(t *testing.T) { + s = startServer(t, PROTO_TEST_PORT, "") +} + +func TestPedanticSub(t *testing.T) { + c := createClientConn(t, "localhost", PROTO_TEST_PORT) + doConnect(t, c, true, true, false) + send := sendCommand(t, c) + expect := expectCommand(t, c) + + // Ping should still be same + send("PING\r\n") + expect(pongRe) + + // Test malformed subjects for SUB + // Sub can contain wildcards, but + // subject must still be legit. + + // Empty terminal token + send("SUB foo. 1\r\n") + expect(errRe) + + // Empty beginning token + send("SUB .foo. 1\r\n") + expect(errRe) + + // Empty middle token + send("SUB foo..bar 1\r\n") + expect(errRe) + + // Bad non-terminal FWC + send("SUB foo.>.bar 1\r\n") + buf := expect(errRe) + + // Check that itr is 'Invalid Subject' + matches := errRe.FindAllSubmatch(buf, -1) + if len(matches) != 1 { + t.Fatal("Wanted one overall match") + } + if string(matches[0][1]) != "'Invalid Subject'" { + t.Fatalf("Expected 'Invalid Subject', got %s", string(matches[0][1])) + } + +} + +func TestPedanticPub(t *testing.T) { + c := createClientConn(t, "localhost", PROTO_TEST_PORT) + doConnect(t, c, true, true, false) + send := sendCommand(t, c) + expect := expectCommand(t, c) + + // Test malformed subjects for PUB + // PUB subjects can not have wildcards + // This will error in pedantic mode + send("PUB foo.* 2\r\nok\r\n") + expect(errRe) + + send("PUB foo.> 2\r\nok\r\n") + expect(errRe) + + send("PUB foo. 2\r\nok\r\n") + expect(errRe) + + send("PUB .foo 2\r\nok\r\n") + expect(errRe) + + send("PUB foo..* 2\r\nok\r\n") + expect(errRe) +} + +func TestStopServerPedantic(t *testing.T) { + s.stopServer() +} diff --git a/test/proto_test.go b/test/proto_test.go index af1e72e7..8020083d 100644 --- a/test/proto_test.go +++ b/test/proto_test.go @@ -106,6 +106,7 @@ var ( pongRe = regexp.MustCompile(`\APONG\r\n`) msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n([^\\r\\n]*?)\r\n)+?)`) okRe = regexp.MustCompile(`\A\+OK\r\n`) + errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`) ) const (