From d51566881eb1e08b5420c8726cccb8b6ab5bc2d2 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 30 Apr 2020 17:05:37 -0700 Subject: [PATCH] First pass at headers awareness for server Signed-off-by: Derek Collison --- server/client.go | 65 +++++++++++- server/client_test.go | 33 +++++++ server/const.go | 3 + server/errors.go | 9 +- server/monitor.go | 2 + server/opts.go | 1 + server/parser.go | 110 ++++++++++++++++++--- server/parser_test.go | 224 ++++++++++++++++++++++++++++++------------ server/server.go | 3 + 9 files changed, 370 insertions(+), 80 deletions(-) diff --git a/server/client.go b/server/client.go index 47c4de52..ba0ce263 100644 --- a/server/client.go +++ b/server/client.go @@ -172,6 +172,7 @@ const ( MissingAccount Revocation InternalClient + MsgHeaderViolation ) // Some flags passed to processMsgResultsEx @@ -220,6 +221,7 @@ type client struct { msgb [msgScratchSize]byte last time.Time parseState + headers bool rtt time.Duration rttStart time.Time @@ -1841,6 +1843,68 @@ func (c *client) processPong() { } } +// Header pubs take form HPUB [reply] \r\n +func (c *client) processHeaderPub(arg []byte) error { + // Unroll splitArgs to avoid runtime/heap issues + a := [MAX_HPUB_ARGS][]byte{} + args := a[:0] + start := -1 + for i, b := range arg { + switch b { + case ' ', '\t': + if start >= 0 { + args = append(args, arg[start:i]) + start = -1 + } + default: + if start < 0 { + start = i + } + } + } + if start >= 0 { + args = append(args, arg[start:]) + } + + c.pa.arg = arg + switch len(args) { + case 3: + c.pa.subject = args[0] + c.pa.reply = nil + c.pa.hdr = parseSize(args[1]) + c.pa.size = parseSize(args[2]) + c.pa.szb = args[2] + case 4: + c.pa.subject = args[0] + c.pa.reply = args[1] + c.pa.hdr = parseSize(args[2]) + c.pa.size = parseSize(args[3]) + c.pa.szb = args[3] + default: + return fmt.Errorf("processHeaderPub Parse Error: '%s'", arg) + } + if c.pa.hdr < 0 { + return fmt.Errorf("processHeaderPub Bad or Missing Header Size: '%s'", arg) + } + // If number overruns an int64, parseSize() will have returned a negative value + if c.pa.size < 0 { + return fmt.Errorf("processHeaderPub Bad or Missing Total Size: '%s'", arg) + } + if c.pa.hdr > c.pa.size { + return fmt.Errorf("processHeaderPub Header Size larger then TotalSize: '%s'", arg) + } + maxPayload := atomic.LoadInt32(&c.mpay) + // Use int64() to avoid int32 overrun... + if maxPayload != jwt.NoLimit && int64(c.pa.size) > int64(maxPayload) { + c.maxPayloadViolation(c.pa.size, maxPayload) + return ErrMaxPayload + } + if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) { + c.sendErr("Invalid Publish Subject") + } + return nil +} + func (c *client) processPub(arg []byte) error { // Unroll splitArgs to avoid runtime/heap issues a := [MAX_PUB_ARGS][]byte{} @@ -1888,7 +1952,6 @@ func (c *client) processPub(arg []byte) error { c.maxPayloadViolation(c.pa.size, maxPayload) return ErrMaxPayload } - if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) { c.sendErr("Invalid Publish Subject") } diff --git a/server/client_test.go b/server/client_test.go index dc9dee70..9e78bc11 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -43,6 +43,7 @@ type serverInfo struct { AuthRequired bool `json:"auth_required"` TLSRequired bool `json:"tls_required"` MaxPayload int64 `json:"max_payload"` + Headers bool `json:"headers"` } type testAsyncClient struct { @@ -190,6 +191,38 @@ func TestClientCreateAndInfo(t *testing.T) { } } +func TestServerHeaderSupport(t *testing.T) { + opts := defaultServerOptions + opts.Port = -1 + s := New(&opts) + c, _, l := newClientForServer(s) + defer c.close() + + if !strings.HasPrefix(l, "INFO ") { + t.Fatalf("INFO response incorrect: %s\n", l) + } + var info serverInfo + if err := json.Unmarshal([]byte(l[5:]), &info); err != nil { + t.Fatalf("Could not parse INFO json: %v\n", err) + } + if !info.Headers { + t.Fatalf("Expected by default for header support to be enabled") + } + + opts.NoHeaderSupport = true + opts.Port = -1 + s = New(&opts) + c, _, l = newClientForServer(s) + defer c.close() + + if err := json.Unmarshal([]byte(l[5:]), &info); err != nil { + t.Fatalf("Could not parse INFO json: %v\n", err) + } + if info.Headers { + t.Fatalf("Expected header support to be disabled") + } +} + func TestNonTLSConnectionState(t *testing.T) { _, c, _ := setupClient() defer c.close() diff --git a/server/const.go b/server/const.go index 12cea5f3..0d97e045 100644 --- a/server/const.go +++ b/server/const.go @@ -130,6 +130,9 @@ const ( // MAX_PUB_ARGS Maximum possible number of arguments from PUB proto. MAX_PUB_ARGS = 3 + // MAX_HPUB_ARGS Maximum possible number of arguments from HPUB proto. + MAX_HPUB_ARGS = 4 + // DEFAULT_MAX_CLOSED_CLIENTS is the maximum number of closed connections we hold onto. DEFAULT_MAX_CLOSED_CLIENTS = 10000 diff --git a/server/errors.go b/server/errors.go index d10474bd..5562a9da 100644 --- a/server/errors.go +++ b/server/errors.go @@ -135,8 +135,15 @@ var ( // ErrRevocation is returned when a credential has been revoked. ErrRevocation = errors.New("credentials have been revoked") - // Used to signal an error that a server is not running. + // ErrServerNotRunning is used to signal an error that a server is not running. ErrServerNotRunning = errors.New("server is not running") + + // ErrBadMsgHeader signals the parser detected a bad message header + ErrBadMsgHeader = errors.New("bad message header detected") + + // ErrMsgHeadersNotSupported signals the parser detected a message header + // but they are not supported on this server. + ErrMsgHeadersNotSupported = errors.New("message headers not supported") ) // configErr is a configuration error. diff --git a/server/monitor.go b/server/monitor.go index 47748302..46c86a01 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1868,6 +1868,8 @@ func (reason ClosedState) String() string { return "Credentials Revoked" case InternalClient: return "Internal Client" + case MsgHeaderViolation: + return "Message Header Violation" } return "Unknown State" } diff --git a/server/opts.go b/server/opts.go index eaca5cee..a0b52e5e 100644 --- a/server/opts.go +++ b/server/opts.go @@ -158,6 +158,7 @@ type Options struct { NoLog bool `json:"-"` NoSigs bool `json:"-"` NoSublistCache bool `json:"-"` + NoHeaderSupport bool `json:"-"` DisableShortFirstPing bool `json:"-"` Logtime bool `json:"-"` MaxConn int `json:"max_connections"` diff --git a/server/parser.go b/server/parser.go index f0e3ea86..7ce12048 100644 --- a/server/parser.go +++ b/server/parser.go @@ -17,6 +17,17 @@ import ( "fmt" ) +type parserState int +type parseState struct { + state parserState + as int + drop int + pa pubArg + argBuf []byte + msgBuf []byte + scratch [MAX_CONTROL_LINE_SIZE]byte +} + type pubArg struct { arg []byte pacache []byte @@ -27,17 +38,7 @@ type pubArg struct { szb []byte queues [][]byte size int -} - -type parserState int -type parseState struct { - state parserState - as int - drop int - pa pubArg - argBuf []byte - msgBuf []byte - scratch [MAX_CONTROL_LINE_SIZE]byte + hdr int } // Parser constants @@ -60,6 +61,12 @@ const ( OP_CONNEC OP_CONNECT CONNECT_ARG + OP_H + OP_HP + OP_HPU + OP_HPUB + OP_HPUB_SPC + HPUB_ARG OP_P OP_PU OP_PUB @@ -144,6 +151,8 @@ func (c *client) parse(buf []byte) error { switch b { case 'P', 'p': c.state = OP_P + case 'H', 'h': + c.state = OP_H case 'S', 's': c.state = OP_S case 'U', 'u': @@ -177,6 +186,73 @@ func (c *client) parse(buf []byte) error { default: goto parseErr } + case OP_H: + switch b { + case 'P', 'p': + c.state = OP_HP + default: + goto parseErr + } + case OP_HP: + switch b { + case 'U', 'u': + c.state = OP_HPU + default: + goto parseErr + } + case OP_HPU: + switch b { + case 'B', 'b': + c.state = OP_HPUB + default: + goto parseErr + } + case OP_HPUB: + switch b { + case ' ', '\t': + c.state = OP_HPUB_SPC + default: + goto parseErr + } + case OP_HPUB_SPC: + switch b { + case ' ', '\t': + continue + default: + c.pa.hdr = 0 + c.state = HPUB_ARG + c.as = i + } + case HPUB_ARG: + switch b { + case '\r': + c.drop = 1 + case '\n': + var arg []byte + if c.argBuf != nil { + arg = c.argBuf + c.argBuf = nil + } else { + arg = buf[c.as : i-c.drop] + } + if trace { + c.traceInOp("HPUB", arg) + } + if err := c.processHeaderPub(arg); err != nil { + return err + } + c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD + // If we don't have a saved buffer then jump ahead with + // the index. If this overruns what is left we fall out + // and process split buffer. + if c.msgBuf == nil { + i = c.as + c.pa.size - LEN_CR_LF + } + default: + if c.argBuf != nil { + c.argBuf = append(c.argBuf, b) + } + } case OP_P: switch b { case 'U', 'u': @@ -207,6 +283,7 @@ func (c *client) parse(buf []byte) error { case ' ', '\t': continue default: + c.pa.hdr = -1 c.state = PUB_ARG c.as = i } @@ -290,7 +367,7 @@ func (c *client) parse(buf []byte) error { c.drop, c.as, c.state = 0, i+1, OP_START // Drop all pub args c.pa.arg, c.pa.pacache, c.pa.account, c.pa.subject = nil, nil, nil, nil - c.pa.reply, c.pa.size, c.pa.szb, c.pa.queues = nil, 0, nil, nil + c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.queues = nil, -1, 0, nil, nil case OP_A: switch b { case '+': @@ -895,7 +972,8 @@ func (c *client) parse(buf []byte) error { } // Check for split buffer scenarios for any ARG state. - if c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG || + if c.state == SUB_ARG || c.state == UNSUB_ARG || + c.state == PUB_ARG || c.state == HPUB_ARG || c.state == ASUB_ARG || c.state == AUSUB_ARG || c.state == MSG_ARG || c.state == MINUS_ERR_ARG || c.state == CONNECT_ARG || c.state == INFO_ARG { @@ -985,6 +1063,10 @@ func (c *client) clonePubArg() error { case LEAF: return c.processLeafMsgArgs(c.argBuf) default: - return c.processPub(c.argBuf) + if c.pa.hdr < 0 { + return c.processPub(c.argBuf) + } else { + return c.processHeaderPub(c.argBuf) + } } } diff --git a/server/parser_test.go b/server/parser_test.go index c2834cfc..945f514d 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -235,70 +235,38 @@ func TestParsePubArg(t *testing.T) { size int szb string }{ - {arg: "a 2", - subject: "a", reply: "", size: 2, szb: "2"}, - {arg: "a 222", - subject: "a", reply: "", size: 222, szb: "222"}, - {arg: "foo 22", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: " foo 22", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: "foo 22 ", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: "foo 22", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: " foo 22 ", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: " foo 22 ", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: "foo bar 22", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: " foo bar 22", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: "foo bar 22 ", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: "foo bar 22", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: " foo bar 22 ", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: " foo bar 22 ", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: " foo bar 2222 ", - subject: "foo", reply: "bar", size: 2222, szb: "2222"}, - {arg: " foo 2222 ", - subject: "foo", reply: "", size: 2222, szb: "2222"}, - {arg: "a\t2", - subject: "a", reply: "", size: 2, szb: "2"}, - {arg: "a\t222", - subject: "a", reply: "", size: 222, szb: "222"}, - {arg: "foo\t22", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: "\tfoo\t22", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: "foo\t22\t", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: "foo\t\t\t22", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: "\tfoo\t22\t", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: "\tfoo\t\t\t22\t", - subject: "foo", reply: "", size: 22, szb: "22"}, - {arg: "foo\tbar\t22", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: "\tfoo\tbar\t22", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: "foo\tbar\t22\t", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: "foo\t\tbar\t\t22", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: "\tfoo\tbar\t22\t", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: "\t \tfoo\t \t \tbar\t \t22\t \t", - subject: "foo", reply: "bar", size: 22, szb: "22"}, - {arg: "\t\tfoo\t\t\tbar\t\t2222\t\t", - subject: "foo", reply: "bar", size: 2222, szb: "2222"}, - {arg: "\t \tfoo\t \t \t\t\t2222\t \t", - subject: "foo", reply: "", size: 2222, szb: "2222"}, + {arg: "a 2", subject: "a", reply: "", size: 2, szb: "2"}, + {arg: "a 222", subject: "a", reply: "", size: 222, szb: "222"}, + {arg: "foo 22", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: " foo 22", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: "foo 22 ", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: "foo 22", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: " foo 22 ", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: " foo 22 ", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: "foo bar 22", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: " foo bar 22", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: "foo bar 22 ", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: "foo bar 22", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: " foo bar 22 ", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: " foo bar 22 ", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: " foo bar 2222 ", subject: "foo", reply: "bar", size: 2222, szb: "2222"}, + {arg: " foo 2222 ", subject: "foo", reply: "", size: 2222, szb: "2222"}, + {arg: "a\t2", subject: "a", reply: "", size: 2, szb: "2"}, + {arg: "a\t222", subject: "a", reply: "", size: 222, szb: "222"}, + {arg: "foo\t22", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: "\tfoo\t22", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: "foo\t22\t", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: "foo\t\t\t22", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: "\tfoo\t22\t", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: "\tfoo\t\t\t22\t", subject: "foo", reply: "", size: 22, szb: "22"}, + {arg: "foo\tbar\t22", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: "\tfoo\tbar\t22", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: "foo\tbar\t22\t", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: "foo\t\tbar\t\t22", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: "\tfoo\tbar\t22\t", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: "\t \tfoo\t \t \tbar\t \t22\t \t", subject: "foo", reply: "bar", size: 22, szb: "22"}, + {arg: "\t\tfoo\t\t\tbar\t\t2222\t\t", subject: "foo", reply: "bar", size: 2222, szb: "2222"}, + {arg: "\t \tfoo\t \t \t\t\t2222\t \t", subject: "foo", reply: "", size: 2222, szb: "2222"}, } { t.Run(test.arg, func(t *testing.T) { if err := c.processPub([]byte(test.arg)); err != nil { @@ -329,6 +297,134 @@ func TestParsePubBadSize(t *testing.T) { } } +func TestParseHeaderPub(t *testing.T) { + c := dummyClient() + + hpub := []byte("HPUB foo 12 17\r\nname:derek\r\nHELLO\r") + if err := c.parse(hpub); err != nil || c.state != MSG_END_N { + t.Fatalf("Unexpected: %d : %v\n", c.state, err) + } + if !bytes.Equal(c.pa.subject, []byte("foo")) { + t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject) + } + if c.pa.reply != nil { + t.Fatalf("Did not parse reply correctly: 'nil' vs '%s'\n", c.pa.reply) + } + if c.pa.hdr != 12 { + t.Fatalf("Did not parse msg header size correctly: 12 vs %d\n", c.pa.hdr) + } + if c.pa.size != 17 { + t.Fatalf("Did not parse msg size correctly: 17 vs %d\n", c.pa.size) + } + + // Clear snapshots + c.argBuf, c.msgBuf, c.state = nil, nil, OP_START + + hpub = []byte("HPUB foo INBOX.22 12 17\r\nname:derek\r\nHELLO\r") + if err := c.parse(hpub); err != nil || c.state != MSG_END_N { + t.Fatalf("Unexpected: %d : %v\n", c.state, err) + } + if !bytes.Equal(c.pa.subject, []byte("foo")) { + t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject) + } + if !bytes.Equal(c.pa.reply, []byte("INBOX.22")) { + t.Fatalf("Did not parse reply correctly: 'INBOX.22' vs '%s'\n", c.pa.reply) + } + if c.pa.hdr != 12 { + t.Fatalf("Did not parse msg header size correctly: 12 vs %d\n", c.pa.hdr) + } + if c.pa.size != 17 { + t.Fatalf("Did not parse msg size correctly: 17 vs %d\n", c.pa.size) + } + + // Clear snapshots + c.argBuf, c.msgBuf, c.state = nil, nil, OP_START + + hpub = []byte("HPUB foo INBOX.22 0 5\r\nHELLO\r") + if err := c.parse(hpub); err != nil || c.state != MSG_END_N { + t.Fatalf("Unexpected: %d : %v\n", c.state, err) + } + if !bytes.Equal(c.pa.subject, []byte("foo")) { + t.Fatalf("Did not parse subject correctly: 'foo' vs '%s'\n", c.pa.subject) + } + if !bytes.Equal(c.pa.reply, []byte("INBOX.22")) { + t.Fatalf("Did not parse reply correctly: 'INBOX.22' vs '%s'\n", c.pa.reply) + } + if c.pa.hdr != 0 { + t.Fatalf("Did not parse msg header size correctly: 0 vs %d\n", c.pa.hdr) + } + if c.pa.size != 5 { + t.Fatalf("Did not parse msg size correctly: 5 vs %d\n", c.pa.size) + } +} + +func TestParseHeaderPubArg(t *testing.T) { + c := dummyClient() + + for _, test := range []struct { + arg string + subject string + reply string + hdr int + size int + szb string + }{ + {arg: "a 2 4", subject: "a", reply: "", hdr: 2, size: 4, szb: "4"}, + {arg: "a 22 222", subject: "a", reply: "", hdr: 22, size: 222, szb: "222"}, + {arg: "foo 3 22", subject: "foo", reply: "", hdr: 3, size: 22, szb: "22"}, + {arg: " foo 1 22", subject: "foo", reply: "", hdr: 1, size: 22, szb: "22"}, + {arg: "foo 0 22 ", subject: "foo", reply: "", hdr: 0, size: 22, szb: "22"}, + {arg: "foo 0 22", subject: "foo", reply: "", hdr: 0, size: 22, szb: "22"}, + {arg: " foo 1 22 ", subject: "foo", reply: "", hdr: 1, size: 22, szb: "22"}, + {arg: " foo 3 22 ", subject: "foo", reply: "", hdr: 3, size: 22, szb: "22"}, + {arg: "foo bar 1 22", subject: "foo", reply: "bar", hdr: 1, size: 22, szb: "22"}, + {arg: " foo bar 11 22", subject: "foo", reply: "bar", hdr: 11, size: 22, szb: "22"}, + {arg: "foo bar 11 22 ", subject: "foo", reply: "bar", hdr: 11, size: 22, szb: "22"}, + {arg: "foo bar 11 22", subject: "foo", reply: "bar", hdr: 11, size: 22, szb: "22"}, + {arg: " foo bar 11 22 ", subject: "foo", reply: "bar", hdr: 11, size: 22, szb: "22"}, + {arg: " foo bar 11 22 ", subject: "foo", reply: "bar", hdr: 11, size: 22, szb: "22"}, + {arg: " foo bar 22 2222 ", subject: "foo", reply: "bar", hdr: 22, size: 2222, szb: "2222"}, + {arg: " foo 1 2222 ", subject: "foo", reply: "", hdr: 1, size: 2222, szb: "2222"}, + {arg: "a\t2\t22", subject: "a", reply: "", hdr: 2, size: 22, szb: "22"}, + {arg: "a\t2\t\t222", subject: "a", reply: "", hdr: 2, size: 222, szb: "222"}, + {arg: "foo\t2 22", subject: "foo", reply: "", hdr: 2, size: 22, szb: "22"}, + {arg: "\tfoo\t11\t 22", subject: "foo", reply: "", hdr: 11, size: 22, szb: "22"}, + {arg: "foo\t11\t22\t", subject: "foo", reply: "", hdr: 11, size: 22, szb: "22"}, + {arg: "foo\t\t\t11 22", subject: "foo", reply: "", hdr: 11, size: 22, szb: "22"}, + {arg: "\tfoo\t11\t \t 22\t", subject: "foo", reply: "", hdr: 11, size: 22, szb: "22"}, + {arg: "\tfoo\t\t\t11 22\t", subject: "foo", reply: "", hdr: 11, size: 22, szb: "22"}, + {arg: "foo\tbar\t2 22", subject: "foo", reply: "bar", hdr: 2, size: 22, szb: "22"}, + {arg: "\tfoo\tbar\t11\t22", subject: "foo", reply: "bar", hdr: 11, size: 22, szb: "22"}, + {arg: "foo\tbar\t11\t\t22\t ", subject: "foo", reply: "bar", hdr: 11, size: 22, szb: "22"}, + {arg: "foo\t\tbar\t\t11\t\t\t22", subject: "foo", reply: "bar", hdr: 11, size: 22, szb: "22"}, + {arg: "\tfoo\tbar\t11\t22\t", subject: "foo", reply: "bar", hdr: 11, size: 22, szb: "22"}, + {arg: "\t \tfoo\t \t \tbar\t \t11\t 22\t \t", subject: "foo", reply: "bar", hdr: 11, size: 22, szb: "22"}, + {arg: "\t\tfoo\t\t\tbar\t\t22\t\t\t2222\t\t", subject: "foo", reply: "bar", hdr: 22, size: 2222, szb: "2222"}, + {arg: "\t \tfoo\t \t \t\t\t11\t\t 2222\t \t", subject: "foo", reply: "", hdr: 11, size: 2222, szb: "2222"}, + } { + t.Run(test.arg, func(t *testing.T) { + if err := c.processHeaderPub([]byte(test.arg)); err != nil { + t.Fatalf("Unexpected parse error: %v\n", err) + } + if !bytes.Equal(c.pa.subject, []byte(test.subject)) { + t.Fatalf("Mismatched subject: '%s'\n", c.pa.subject) + } + if !bytes.Equal(c.pa.reply, []byte(test.reply)) { + t.Fatalf("Mismatched reply subject: '%s'\n", c.pa.reply) + } + if !bytes.Equal(c.pa.szb, []byte(test.szb)) { + t.Fatalf("Bad size buf: '%s'\n", c.pa.szb) + } + if c.pa.hdr != test.hdr { + t.Fatalf("Bad header size: %d\n", c.pa.hdr) + } + if c.pa.size != test.size { + t.Fatalf("Bad size: %d\n", c.pa.size) + } + }) + } +} + func TestParseRouteMsg(t *testing.T) { c := dummyRouteClient() diff --git a/server/server.go b/server/server.go index 8e4b8269..dc374025 100644 --- a/server/server.go +++ b/server/server.go @@ -69,6 +69,7 @@ type Info struct { GoVersion string `json:"go"` Host string `json:"host"` Port int `json:"port"` + Headers bool `json:"headers"` AuthRequired bool `json:"auth_required,omitempty"` TLSRequired bool `json:"tls_required,omitempty"` TLSVerify bool `json:"tls_verify,omitempty"` @@ -273,6 +274,7 @@ func NewServer(opts *Options) (*Server, error) { TLSVerify: verify, MaxPayload: opts.MaxPayload, JetStream: opts.JetStream, + Headers: !opts.NoHeaderSupport, } now := time.Now() @@ -1844,6 +1846,7 @@ func (s *Server) createClient(conn net.Conn) *client { s.mu.Lock() info := s.copyInfo() c.nonce = []byte(info.Nonce) + c.headers = info.Headers s.totalClients++ s.mu.Unlock()