diff --git a/server/errors.go b/server/errors.go index 3354d3f3..b8ebb11b 100644 --- a/server/errors.go +++ b/server/errors.go @@ -17,6 +17,9 @@ var ( // ErrMaxPayload represents an error condition when the payload is too big. ErrMaxPayload = errors.New("Maximum Payload Exceeded") + // ErrMaxControlLine represents an error condition when the control line is too big. + ErrMaxControlLine = errors.New("Maximum Control Line Exceeded") + // ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.> ErrReservedPublishSubject = errors.New("Reserved Internal Subject") ) diff --git a/server/parser.go b/server/parser.go index d61e404e..f64d70fe 100644 --- a/server/parser.go +++ b/server/parser.go @@ -85,6 +85,8 @@ func (c *client) parse(buf []byte) error { var i int var b byte + mcl := c.srv.opts.MaxControlLine + // snapshot this, and reset when we receive a // proper CONNECT if needed. authSet := c.isAuthTimerSet() @@ -169,7 +171,7 @@ func (c *client) parse(buf []byte) error { if err := c.processPub(arg); err != nil { return err } - c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD + c.drop, c.as, c.state = OP_START, i+1, MSG_PAYLOAD // If we don't have a saved buffer then jump ahead with // the index. If this overruns what is left we fall out // and process split buffer. @@ -631,14 +633,26 @@ func (c *client) parse(buf []byte) error { goto parseErr } } + // Check for split buffer scenarios for any ARG state. - if (c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG || + if c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG || c.state == MSG_ARG || c.state == MINUS_ERR_ARG || - c.state == CONNECT_ARG || c.state == INFO_ARG) && c.argBuf == nil { - c.argBuf = c.scratch[:0] - c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...) - // FIXME(dlc), check max control line len + c.state == CONNECT_ARG || c.state == INFO_ARG { + // Setup a holder buffer to deal with split buffer scenario. + if c.argBuf == nil { + c.argBuf = c.scratch[:0] + c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...) + } + // Check for violations of control line length here. Note that this is not + // exact at all but the performance hit is too great to be precise, and + // catching here should prevent memory exhaustion attacks. + if len(c.argBuf) > mcl { + c.sendErr("Maximum Control Line Exceeded") + c.closeConnection() + return ErrMaxControlLine + } } + // Check for split msg if (c.state == MSG_PAYLOAD || c.state == MSG_END) && c.msgBuf == nil { // We need to clone the pubArg if it is still referencing the diff --git a/test/proto_test.go b/test/proto_test.go index 26950e35..c58c8257 100644 --- a/test/proto_test.go +++ b/test/proto_test.go @@ -264,3 +264,20 @@ func TestIncompletePubArg(t *testing.T) { send(pub + goodBuf + pub + goodBuf + pub + badBuf + pub + badBuf2) expect(errRe) } + +func TestControlLineMaximums(t *testing.T) { + s := runProtoServer() + defer s.Shutdown() + + c := createClientConn(t, "localhost", PROTO_TEST_PORT) + defer c.Close() + + send, expect := setupConn(t, c) + + pubTooLong := "PUB foo " + for i := 0; i < 32; i++ { + pubTooLong += "2222222222" + } + send(pubTooLong) + expect(errRe) +} diff --git a/test/test.go b/test/test.go index 3d11fb45..e475129c 100644 --- a/test/test.go +++ b/test/test.go @@ -34,10 +34,11 @@ type tLogger interface { // DefaultTestOptions are default options for the unit tests. var DefaultTestOptions = server.Options{ - Host: "localhost", - Port: 4222, - NoLog: true, - NoSigs: true, + Host: "localhost", + Port: 4222, + NoLog: true, + NoSigs: true, + MaxControlLine: 256, } // RunDefaultServer starts a new Go routine based server using the default options