diff --git a/TODO.md b/TODO.md index cadb2475..767da3c7 100644 --- a/TODO.md +++ b/TODO.md @@ -3,23 +3,22 @@ - [ ] Multiple listen endpoints - [ ] Listen configure key vs addr and port -- [ ] Multiple Auth -- [ ] Authorization / Access +- [ ] Multiple Authorization / Access - [ ] T series reservations - [ ] _SYS. server events? - [ ] No downtime restart - [ ] Signal based reload of configuration -- [ ] Dynamic socket buffer sizes - [ ] brew, apt-get, rpm, chocately (windows) - [ ] Buffer pools/sync pools? - [ ] IOVec pools and writev for high fanout? - [ ] Add ability to reload config on signal -- [ ] Add ENV and variable support to dconf +- [ ] Add ENV and variable support to dconf? ucl? - [ ] Modify cluster support for single message across routes between pub/sub and d-queue - [ ] Memory limits/warnings? - [ ] Limit number of subscriptions a client can have, total memory usage etc. - [ ] Multi-tenant accounts with isolation of subject space - [ ] Pedantic state +- [X] Dynamic socket buffer sizes - [X] Info updates contain other implicit route servers - [X] Sublist better at high concurrency, cache uses writelock always currently - [X] Switch to 1.4/1.5 and use maps vs hashmaps in sublist diff --git a/server/client.go b/server/client.go index 976b3146..e17d123d 100644 --- a/server/client.go +++ b/server/client.go @@ -23,6 +23,13 @@ const ( msgHeadProto = "MSG " ) +// For controlling dynamic buffer sizes. +const ( + startReadBufSize = 512 // For INFO JSON block + minReadBufSize = 128 + maxReadBufSize = 65536 +) + // Type of client const ( // CLIENT is an end user. @@ -164,9 +171,8 @@ func (c *client) readLoop() { return } - // read buffer - // FIXME(dlc) make dynamic - b := make([]byte, s.opts.BufSize) + // Start read buffer. + b := make([]byte, startReadBufSize) for { n, err := nc.Read(b) @@ -226,6 +232,18 @@ func (c *client) readLoop() { if nc == nil { return } + + // Update buffer size as/if needed. + + // Grow + if n == len(b) && len(b) < maxReadBufSize { + b = make([]byte, len(b)*2) + } + + // Shrink, for now don't accelerate, ping/pong will eventually sort it out. + if n < len(b)/2 && len(b) > minReadBufSize { + b = make([]byte, len(b)/2) + } } } diff --git a/server/const.go b/server/const.go index db999477..1d22da41 100644 --- a/server/const.go +++ b/server/const.go @@ -8,7 +8,7 @@ import ( const ( // VERSION is the current version for the server. - VERSION = "0.7.9" + VERSION = "0.8.0.beta" // DEFAULT_PORT is the deault port for client connections. DEFAULT_PORT = 4222 diff --git a/server/parser.go b/server/parser.go index bd1870e5..b369fbc7 100644 --- a/server/parser.go +++ b/server/parser.go @@ -537,10 +537,20 @@ func (c *client) parse(buf []byte) error { case '\r': c.drop = 1 case '\n': - if err := c.processInfo(buf[c.as : i-c.drop]); err != nil { + var arg []byte + if c.argBuf != nil { + arg = c.argBuf + } else { + arg = buf[c.as : i-c.drop] + } + if err := c.processInfo(arg); err != nil { return err } - c.drop, c.state = 0, OP_START + c.drop, c.as, c.state = 0, i+1, OP_START + default: + if c.argBuf != nil { + c.argBuf = append(c.argBuf, b) + } } case OP_PLUS: switch b { @@ -623,7 +633,7 @@ func (c *client) parse(buf []byte) error { // Check for split buffer scenarios for any ARG state. if (c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG || c.state == MSG_ARG || c.state == MINUS_ERR_ARG || - c.state == CONNECT_ARG) && c.argBuf == nil { + c.state == CONNECT_ARG || c.state == INFO_ARG) && c.argBuf == nil { c.argBuf = c.scratch[:0] c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...) // FIXME(dlc), check max control line len diff --git a/test/bench_results.txt b/test/bench_results.txt index 4946bb92..ff796d11 100644 --- a/test/bench_results.txt +++ b/test/bench_results.txt @@ -1,4 +1,24 @@ 2015 iMac5k 4Ghz i7 Haswell +OSX El Capitan 10.11.3 + +=================== +Go version go1.6 +=================== + +Benchmark____PubNo_Payload-8 20000000 107 ns/op 101.94 MB/s +Benchmark____Pub8b_Payload-8 20000000 111 ns/op 171.14 MB/s +Benchmark___Pub32b_Payload-8 20000000 118 ns/op 372.36 MB/s +Benchmark__Pub256B_Payload-8 10000000 160 ns/op 1671.47 MB/s +Benchmark____Pub1K_Payload-8 5000000 272 ns/op 3815.01 MB/s +Benchmark____Pub4K_Payload-8 2000000 967 ns/op 4249.82 MB/s +Benchmark____Pub8K_Payload-8 500000 2081 ns/op 3942.82 MB/s +Benchmark___________PubSub-8 5000000 250 ns/op +Benchmark___PubSubTwoConns-8 5000000 247 ns/op +Benchmark___PubTwoQueueSub-8 5000000 266 ns/op +Benchmark__PubFourQueueSub-8 5000000 270 ns/op +Benchmark_PubEightQueueSub-8 5000000 267 ns/op + + OSX Yosemite 10.10.5 ===================