dynamic read buffers

This commit is contained in:
Derek Collison
2016-04-03 14:30:17 -07:00
parent ca4bde918b
commit 768f23b5b4
5 changed files with 58 additions and 11 deletions

View File

@@ -3,23 +3,22 @@
- [ ] Multiple listen endpoints
- [ ] Listen configure key vs addr and port
- [ ] Multiple Auth
- [ ] Authorization / Access
- [ ] Multiple Authorization / Access
- [ ] T series reservations
- [ ] _SYS. server events?
- [ ] No downtime restart
- [ ] Signal based reload of configuration
- [ ] Dynamic socket buffer sizes
- [ ] brew, apt-get, rpm, chocately (windows)
- [ ] Buffer pools/sync pools?
- [ ] IOVec pools and writev for high fanout?
- [ ] Add ability to reload config on signal
- [ ] Add ENV and variable support to dconf
- [ ] Add ENV and variable support to dconf? ucl?
- [ ] Modify cluster support for single message across routes between pub/sub and d-queue
- [ ] Memory limits/warnings?
- [ ] Limit number of subscriptions a client can have, total memory usage etc.
- [ ] Multi-tenant accounts with isolation of subject space
- [ ] Pedantic state
- [X] Dynamic socket buffer sizes
- [X] Info updates contain other implicit route servers
- [X] Sublist better at high concurrency, cache uses writelock always currently
- [X] Switch to 1.4/1.5 and use maps vs hashmaps in sublist

View File

@@ -23,6 +23,13 @@ const (
msgHeadProto = "MSG "
)
// For controlling dynamic buffer sizes.
const (
startReadBufSize = 512 // For INFO JSON block
minReadBufSize = 128
maxReadBufSize = 65536
)
// Type of client
const (
// CLIENT is an end user.
@@ -164,9 +171,8 @@ func (c *client) readLoop() {
return
}
// read buffer
// FIXME(dlc) make dynamic
b := make([]byte, s.opts.BufSize)
// Start read buffer.
b := make([]byte, startReadBufSize)
for {
n, err := nc.Read(b)
@@ -226,6 +232,18 @@ func (c *client) readLoop() {
if nc == nil {
return
}
// Update buffer size as/if needed.
// Grow
if n == len(b) && len(b) < maxReadBufSize {
b = make([]byte, len(b)*2)
}
// Shrink, for now don't accelerate, ping/pong will eventually sort it out.
if n < len(b)/2 && len(b) > minReadBufSize {
b = make([]byte, len(b)/2)
}
}
}

View File

@@ -8,7 +8,7 @@ import (
const (
// VERSION is the current version for the server.
VERSION = "0.7.9"
VERSION = "0.8.0.beta"
// DEFAULT_PORT is the deault port for client connections.
DEFAULT_PORT = 4222

View File

@@ -537,10 +537,20 @@ func (c *client) parse(buf []byte) error {
case '\r':
c.drop = 1
case '\n':
if err := c.processInfo(buf[c.as : i-c.drop]); err != nil {
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processInfo(arg); err != nil {
return err
}
c.drop, c.state = 0, OP_START
c.drop, c.as, c.state = 0, i+1, OP_START
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_PLUS:
switch b {
@@ -623,7 +633,7 @@ func (c *client) parse(buf []byte) error {
// Check for split buffer scenarios for any ARG state.
if (c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG ||
c.state == MSG_ARG || c.state == MINUS_ERR_ARG ||
c.state == CONNECT_ARG) && c.argBuf == nil {
c.state == CONNECT_ARG || c.state == INFO_ARG) && c.argBuf == nil {
c.argBuf = c.scratch[:0]
c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
// FIXME(dlc), check max control line len

View File

@@ -1,4 +1,24 @@
2015 iMac5k 4Ghz i7 Haswell
OSX El Capitan 10.11.3
===================
Go version go1.6
===================
Benchmark____PubNo_Payload-8 20000000 107 ns/op 101.94 MB/s
Benchmark____Pub8b_Payload-8 20000000 111 ns/op 171.14 MB/s
Benchmark___Pub32b_Payload-8 20000000 118 ns/op 372.36 MB/s
Benchmark__Pub256B_Payload-8 10000000 160 ns/op 1671.47 MB/s
Benchmark____Pub1K_Payload-8 5000000 272 ns/op 3815.01 MB/s
Benchmark____Pub4K_Payload-8 2000000 967 ns/op 4249.82 MB/s
Benchmark____Pub8K_Payload-8 500000 2081 ns/op 3942.82 MB/s
Benchmark___________PubSub-8 5000000 250 ns/op
Benchmark___PubSubTwoConns-8 5000000 247 ns/op
Benchmark___PubTwoQueueSub-8 5000000 266 ns/op
Benchmark__PubFourQueueSub-8 5000000 270 ns/op
Benchmark_PubEightQueueSub-8 5000000 267 ns/op
OSX Yosemite 10.10.5
===================