diff --git a/server/client.go b/server/client.go index 2c09f55b..05a19848 100644 --- a/server/client.go +++ b/server/client.go @@ -544,6 +544,13 @@ func (c *client) initClient() { // Snapshots to avoid mutex access in fast paths. c.out.wdl = opts.WriteDeadline c.out.mp = opts.MaxPending + // Snapshot max control line since currently can not be changed on reload and we + // were checking it on each call to parse. If this changes and we allow MaxControlLine + // to be reloaded without restart, this code will need to change. + c.mcl = int32(opts.MaxControlLine) + if c.mcl == 0 { + c.mcl = MAX_CONTROL_LINE_SIZE + } c.subs = make(map[string]*subscription) c.echo = true @@ -1005,15 +1012,6 @@ func (c *client) readLoop(pre []byte) { c.mqtt.r = &mqttReader{reader: nc} } c.in.rsz = startBufSize - // Snapshot max control line since currently can not be changed on reload and we - // were checking it on each call to parse. If this changes and we allow MaxControlLine - // to be reloaded without restart, this code will need to change. - c.mcl = MAX_CONTROL_LINE_SIZE - if s != nil { - if opts := s.getOpts(); opts != nil { - c.mcl = int32(opts.MaxControlLine) - } - } // Check the per-account-cache for closed subscriptions cpacc := c.kind == ROUTER || c.kind == GATEWAY diff --git a/server/const.go b/server/const.go index 7e4a0d74..19dd9b02 100644 --- a/server/const.go +++ b/server/const.go @@ -124,6 +124,9 @@ const ( // PROTO_SNIPPET_SIZE is the default size of proto to print on parse errors. PROTO_SNIPPET_SIZE = 32 + // MAX_CONTOL_LINE_SNIPPET_SIZE is the default size of proto to print on max control line errors. + MAX_CONTOL_LINE_SNIPPET_SIZE = 128 + // MAX_MSG_ARGS Maximum possible number of arguments from MSG proto. MAX_MSG_ARGS = 4 diff --git a/server/log_test.go b/server/log_test.go index 19c0aa97..edead35a 100644 --- a/server/log_test.go +++ b/server/log_test.go @@ -251,6 +251,7 @@ func TestNoPasswordsFromConnectTrace(t *testing.T) { opts.Username = "derek" opts.Password = "s3cr3t" opts.PingInterval = 2 * time.Minute + setBaselineOptions(opts) s := &Server{opts: opts} dl := &DummyLogger{} s.SetLogger(dl, false, true) diff --git a/server/parser.go b/server/parser.go index 1bccfd07..331ddf70 100644 --- a/server/parser.go +++ b/server/parser.go @@ -259,6 +259,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } if trace { c.traceInOp("HPUB", arg) } @@ -319,6 +322,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } var err error if c.kind == ROUTER || c.kind == GATEWAY { if trace { @@ -391,6 +397,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } if trace { c.traceInOp("PUB", arg) } @@ -508,6 +517,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } if trace { c.traceInOp("A+", arg) } @@ -547,6 +559,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } if trace { c.traceInOp("A-", arg) } @@ -598,6 +613,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } var err error switch c.kind { @@ -730,6 +748,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } var err error switch c.kind { @@ -876,6 +897,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } if trace { c.traceInOp("CONNECT", removePassFromTrace(arg)) } @@ -934,6 +958,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } var err error if c.kind == ROUTER || c.kind == GATEWAY { switch c.op { @@ -1010,6 +1037,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } if err := c.processInfo(arg); err != nil { return err } @@ -1086,6 +1116,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if err := c.overMaxControlLineLimit(arg, mcl); err != nil { + return err + } c.processErr(string(arg)) c.drop, c.as, c.state = 0, i+1, OP_START default: @@ -1113,11 +1146,7 @@ func (c *client) parse(buf []byte) error { // Check for violations of control line length here. Note that this is not // exact at all but the performance hit is too great to be precise, and // catching here should prevent memory exhaustion attacks. - if len(c.argBuf) > int(mcl) { - err := NewErrorCtx(ErrMaxControlLine, "State %d, max_control_line %d, Buffer len %d", - c.state, int(mcl), len(c.argBuf)) - c.sendErr(err.Error()) - c.closeConnection(MaxControlLineExceeded) + if err := c.overMaxControlLineLimit(c.argBuf, mcl); err != nil { return err } } @@ -1160,13 +1189,13 @@ authErr: parseErr: c.sendErr("Unknown Protocol Operation") - snip := protoSnippet(i, buf) + snip := protoSnippet(i, PROTO_SNIPPET_SIZE, buf) err := fmt.Errorf("%s parser ERROR, state=%d, i=%d: proto='%s...'", c.typeString(), c.state, i, snip) return err } -func protoSnippet(start int, buf []byte) string { - stop := start + PROTO_SNIPPET_SIZE +func protoSnippet(start, max int, buf []byte) string { + stop := start + max bufSize := len(buf) if start >= bufSize { return `""` @@ -1177,6 +1206,23 @@ func protoSnippet(start int, buf []byte) string { return fmt.Sprintf("%q", buf[start:stop]) } +// Check if the length of buffer `arg` is over the max control line limit `mcl`. +// If so, an error is sent to the client and the connection is closed. +// The error ErrMaxControlLine is returned. +func (c *client) overMaxControlLineLimit(arg []byte, mcl int32) error { + if c.kind != CLIENT { + return nil + } + if len(arg) > int(mcl) { + err := NewErrorCtx(ErrMaxControlLine, "State %d, max_control_line %d, Buffer len %d (snip: %s...)", + c.state, int(mcl), len(c.argBuf), protoSnippet(0, MAX_CONTOL_LINE_SNIPPET_SIZE, arg)) + c.sendErr(err.Error()) + c.closeConnection(MaxControlLineExceeded) + return err + } + return nil +} + // clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but // we need to hold onto it into the next read. func (c *client) clonePubArg(lmsg bool) error { diff --git a/server/parser_test.go b/server/parser_test.go index 00ff79ea..c92350bc 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -775,7 +775,7 @@ func TestProtoSnippet(t *testing.T) { } for _, tt := range tests { - got := protoSnippet(tt.input, sample) + got := protoSnippet(tt.input, PROTO_SNIPPET_SIZE, sample) if tt.expected != got { t.Errorf("Expected protocol snippet to be %s when start=%d but got %s\n", tt.expected, tt.input, got) } @@ -811,12 +811,60 @@ func TestParseOK(t *testing.T) { } func TestMaxControlLine(t *testing.T) { - c := dummyClient() - c.mcl = 8 + for _, test := range []struct { + name string + kind int + shouldFail bool + }{ + {"client", CLIENT, true}, + {"leaf", LEAF, false}, + {"route", ROUTER, false}, + {"gateway", GATEWAY, false}, + } { + t.Run(test.name, func(t *testing.T) { + pub := []byte("PUB foo.bar.baz 2\r\nok\r\n") - pub := []byte("PUB foo.bar 11\r") - err := c.parse(pub) - if !ErrorIs(err, ErrMaxControlLine) { - t.Fatalf("Expected an error parsing longer than expected control line") + setupClient := func() *client { + c := dummyClient() + c.setNoReconnect() + c.flags.set(connectReceived) + c.kind = test.kind + if test.kind == GATEWAY { + c.gw = &gateway{outbound: false, connected: true, insim: make(map[string]*insie)} + } + c.mcl = 8 + return c + } + + c := setupClient() + // First try with a partial: + // PUB foo.bar.baz 2\r\nok\r\n + // .............^ + err := c.parse(pub[:14]) + switch test.shouldFail { + case true: + if !ErrorIs(err, ErrMaxControlLine) { + t.Fatalf("Expected an error parsing longer than expected control line") + } + case false: + if err != nil { + t.Fatalf("Should not have failed, got %v", err) + } + } + + // Now with full protocol (no split) and we should still enforce. + c = setupClient() + err = c.parse(pub) + switch test.shouldFail { + case true: + if !ErrorIs(err, ErrMaxControlLine) { + t.Fatalf("Expected an error parsing longer than expected control line") + } + case false: + if err != nil { + t.Fatalf("Should not have failed, got %v", err) + } + } + }) } }