diff --git a/TODO.md b/TODO.md index 5459f7bb..cae502a1 100644 --- a/TODO.md +++ b/TODO.md @@ -19,7 +19,7 @@ - [ ] Limit number of subscriptions a client can have, total memory usage etc. - [ ] Multi-tenant accounts with isolation of subject space - [ ] Pedantic state -- [ ] Write dynamic socket buffer sizes +- [X] Write dynamic socket buffer sizes - [X] Read dynamic socket buffer sizes - [X] Info updates contain other implicit route servers - [X] Sublist better at high concurrency, cache uses writelock always currently diff --git a/gnatsd.go b/gnatsd.go index d16a653b..02bda51d 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -67,10 +67,6 @@ func main() { flag.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.") flag.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.") - // Not public per se, will be replaced with dynamic system, but can be used to lower memory footprint when - // lots of connections present. - flag.IntVar(&opts.BufSize, "bs", 0, "Read/Write buffer size per client connection.") - flag.Usage = server.Usage flag.Parse() diff --git a/server/client.go b/server/client.go index 8957d561..25bd4e4b 100644 --- a/server/client.go +++ b/server/client.go @@ -25,9 +25,9 @@ const ( // For controlling dynamic buffer sizes. const ( - startReadBufSize = 512 // For INFO JSON block - minReadBufSize = 128 - maxReadBufSize = 65536 + startBufSize = 512 // For INFO/CONNECT block + minBufSize = 128 + maxBufSize = 65536 ) // Type of client @@ -74,6 +74,7 @@ type readCache struct { inBytes int64 results map[string]*SublistResult prand *rand.Rand + wfc int64 } func (c *client) String() (id string) { @@ -115,7 +116,7 @@ func init() { func (c *client) initClient(tlsConn bool) { s := c.srv c.cid = atomic.AddUint64(&s.gcid, 1) - c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize) + c.bw = bufio.NewWriterSize(c.nc, startBufSize) c.subs = make(map[string]*subscription) c.debug = (atomic.LoadInt32(&debug) != 0) c.trace = (atomic.LoadInt32(&trace) != 0) @@ -165,7 +166,7 @@ func (c *client) readLoop() { } // Start read buffer. - b := make([]byte, startReadBufSize) + b := make([]byte, startBufSize) for { n, err := nc.Read(b) @@ -201,6 +202,11 @@ func (c *client) readLoop() { // Flush those in the set cp.mu.Lock() if cp.nc != nil { + // Gather the flush calls that happened before now. + // This is a signal into us about dynamic buffer allocation tuning. + wfc := atomic.LoadInt64(&cp.cache.wfc) + atomic.StoreInt64(&cp.cache.wfc, 0) + cp.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) err := cp.bw.Flush() cp.nc.SetWriteDeadline(time.Time{}) @@ -212,6 +218,16 @@ func (c *client) readLoop() { } else { // Update outbound last activity. cp.last = last + // Check if we should tune the buffer. + sz := cp.bw.Available() + // Check for expansion opportunity. + if wfc > 2 && sz <= maxBufSize/2 { + cp.bw = bufio.NewWriterSize(cp.nc, sz*2) + } + // Check for shrinking opportunity. + if wfc == 0 && sz >= minBufSize*2 { + cp.bw = bufio.NewWriterSize(cp.nc, sz/2) + } } } cp.mu.Unlock() @@ -229,12 +245,12 @@ func (c *client) readLoop() { // Update buffer size as/if needed. // Grow - if n == len(b) && len(b) < maxReadBufSize { + if n == len(b) && len(b) < maxBufSize { b = make([]byte, len(b)*2) } // Shrink, for now don't accelerate, ping/pong will eventually sort it out. - if n < len(b)/2 && len(b) > minReadBufSize { + if n < len(b)/2 && len(b) > minBufSize { b = make([]byte, len(b)/2) } } @@ -709,6 +725,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { deadlineSet := false if client.bw.Available() < (len(mh) + len(msg) + len(CR_LF)) { + atomic.AddInt64(&c.cache.wfc, 1) client.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) deadlineSet = true } @@ -879,8 +896,10 @@ func (c *client) processMsg(msg []byte) { qsubs := r.qsubs[i] index := c.cache.prand.Intn(len(qsubs)) sub := qsubs[index] - mh := c.msgHeader(msgh[:si], sub) - c.deliverMsg(sub, mh, msg) + if sub != nil { + mh := c.msgHeader(msgh[:si], sub) + c.deliverMsg(sub, mh, msg) + } } } } diff --git a/server/client_test.go b/server/client_test.go index 967ee490..5c6da488 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -52,7 +52,7 @@ var defaultServerOptions = Options{ func rawSetup(serverOptions Options) (*Server, *client, *bufio.Reader, string) { cli, srv := net.Pipe() - cr := bufio.NewReaderSize(cli, DEFAULT_BUF_SIZE) + cr := bufio.NewReaderSize(cli, maxBufSize) s := New(&serverOptions) if serverOptions.Authorization != "" { s.SetClientAuthMethod(&mockAuth{}) diff --git a/server/const.go b/server/const.go index 1d22da41..68bf9ba8 100644 --- a/server/const.go +++ b/server/const.go @@ -82,8 +82,4 @@ const ( // MAX_PUB_ARGS Maximum possible number of arguments from PUB proto. MAX_PUB_ARGS = 3 - - // DEFAULT_BUF_SIZE is the default buffer size for reads and writes per connection. - // It will be replaced by a dynamic system in the long run. - DEFAULT_BUF_SIZE = 32768 ) diff --git a/server/opts.go b/server/opts.go index afe88f8e..9fc522a5 100644 --- a/server/opts.go +++ b/server/opts.go @@ -53,7 +53,6 @@ type Options struct { RemoteSyslog string `json:"-"` Routes []*url.URL `json:"-"` RoutesStr string `json:"-"` - BufSize int `json:"-"` TLSTimeout float64 `json:"tls_timeout"` TLS bool `json:"-"` TLSVerify bool `json:"-"` @@ -572,7 +571,4 @@ func processOptions(opts *Options) { if opts.MaxPending == 0 { opts.MaxPending = MAX_PENDING_SIZE } - if opts.BufSize == 0 { - opts.BufSize = DEFAULT_BUF_SIZE - } } diff --git a/server/opts_test.go b/server/opts_test.go index 38b7714c..15cb315f 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -24,7 +24,6 @@ func TestDefaultOptions(t *testing.T) { MaxPending: MAX_PENDING_SIZE, ClusterAuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), ClusterTLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second), - BufSize: DEFAULT_BUF_SIZE, } opts := &Options{} diff --git a/server/route.go b/server/route.go index c2a34467..1d6ddd92 100644 --- a/server/route.go +++ b/server/route.go @@ -1,4 +1,4 @@ -// Copyright 2013-2015 Apcera Inc. All rights reserved. +// Copyright 2013-2016 Apcera Inc. All rights reserved. package server @@ -331,7 +331,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { } // Rewrap bw - c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize) + c.bw = bufio.NewWriterSize(c.nc, startBufSize) // Do final client initialization diff --git a/server/server.go b/server/server.go index 4130bb67..b37e7d00 100644 --- a/server/server.go +++ b/server/server.go @@ -1,4 +1,4 @@ -// Copyright 2012-2015 Apcera Inc. All rights reserved. +// Copyright 2012-2016 Apcera Inc. All rights reserved. package server @@ -530,7 +530,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.mu.Lock() // Rewrap bw - c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize) + c.bw = bufio.NewWriterSize(c.nc, startBufSize) // Do final client initialization