mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Enable dynamic write buffers for client connections
This commit is contained in:
2
TODO.md
2
TODO.md
@@ -19,7 +19,7 @@
|
||||
- [ ] Limit number of subscriptions a client can have, total memory usage etc.
|
||||
- [ ] Multi-tenant accounts with isolation of subject space
|
||||
- [ ] Pedantic state
|
||||
- [ ] Write dynamic socket buffer sizes
|
||||
- [X] Write dynamic socket buffer sizes
|
||||
- [X] Read dynamic socket buffer sizes
|
||||
- [X] Info updates contain other implicit route servers
|
||||
- [X] Sublist better at high concurrency, cache uses writelock always currently
|
||||
|
||||
@@ -67,10 +67,6 @@ func main() {
|
||||
flag.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.")
|
||||
flag.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.")
|
||||
|
||||
// Not public per se, will be replaced with dynamic system, but can be used to lower memory footprint when
|
||||
// lots of connections present.
|
||||
flag.IntVar(&opts.BufSize, "bs", 0, "Read/Write buffer size per client connection.")
|
||||
|
||||
flag.Usage = server.Usage
|
||||
|
||||
flag.Parse()
|
||||
|
||||
@@ -25,9 +25,9 @@ const (
|
||||
|
||||
// For controlling dynamic buffer sizes.
|
||||
const (
|
||||
startReadBufSize = 512 // For INFO JSON block
|
||||
minReadBufSize = 128
|
||||
maxReadBufSize = 65536
|
||||
startBufSize = 512 // For INFO/CONNECT block
|
||||
minBufSize = 128
|
||||
maxBufSize = 65536
|
||||
)
|
||||
|
||||
// Type of client
|
||||
@@ -74,6 +74,7 @@ type readCache struct {
|
||||
inBytes int64
|
||||
results map[string]*SublistResult
|
||||
prand *rand.Rand
|
||||
wfc int64
|
||||
}
|
||||
|
||||
func (c *client) String() (id string) {
|
||||
@@ -115,7 +116,7 @@ func init() {
|
||||
func (c *client) initClient(tlsConn bool) {
|
||||
s := c.srv
|
||||
c.cid = atomic.AddUint64(&s.gcid, 1)
|
||||
c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize)
|
||||
c.bw = bufio.NewWriterSize(c.nc, startBufSize)
|
||||
c.subs = make(map[string]*subscription)
|
||||
c.debug = (atomic.LoadInt32(&debug) != 0)
|
||||
c.trace = (atomic.LoadInt32(&trace) != 0)
|
||||
@@ -165,7 +166,7 @@ func (c *client) readLoop() {
|
||||
}
|
||||
|
||||
// Start read buffer.
|
||||
b := make([]byte, startReadBufSize)
|
||||
b := make([]byte, startBufSize)
|
||||
|
||||
for {
|
||||
n, err := nc.Read(b)
|
||||
@@ -201,6 +202,11 @@ func (c *client) readLoop() {
|
||||
// Flush those in the set
|
||||
cp.mu.Lock()
|
||||
if cp.nc != nil {
|
||||
// Gather the flush calls that happened before now.
|
||||
// This is a signal into us about dynamic buffer allocation tuning.
|
||||
wfc := atomic.LoadInt64(&cp.cache.wfc)
|
||||
atomic.StoreInt64(&cp.cache.wfc, 0)
|
||||
|
||||
cp.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE))
|
||||
err := cp.bw.Flush()
|
||||
cp.nc.SetWriteDeadline(time.Time{})
|
||||
@@ -212,6 +218,16 @@ func (c *client) readLoop() {
|
||||
} else {
|
||||
// Update outbound last activity.
|
||||
cp.last = last
|
||||
// Check if we should tune the buffer.
|
||||
sz := cp.bw.Available()
|
||||
// Check for expansion opportunity.
|
||||
if wfc > 2 && sz <= maxBufSize/2 {
|
||||
cp.bw = bufio.NewWriterSize(cp.nc, sz*2)
|
||||
}
|
||||
// Check for shrinking opportunity.
|
||||
if wfc == 0 && sz >= minBufSize*2 {
|
||||
cp.bw = bufio.NewWriterSize(cp.nc, sz/2)
|
||||
}
|
||||
}
|
||||
}
|
||||
cp.mu.Unlock()
|
||||
@@ -229,12 +245,12 @@ func (c *client) readLoop() {
|
||||
// Update buffer size as/if needed.
|
||||
|
||||
// Grow
|
||||
if n == len(b) && len(b) < maxReadBufSize {
|
||||
if n == len(b) && len(b) < maxBufSize {
|
||||
b = make([]byte, len(b)*2)
|
||||
}
|
||||
|
||||
// Shrink, for now don't accelerate, ping/pong will eventually sort it out.
|
||||
if n < len(b)/2 && len(b) > minReadBufSize {
|
||||
if n < len(b)/2 && len(b) > minBufSize {
|
||||
b = make([]byte, len(b)/2)
|
||||
}
|
||||
}
|
||||
@@ -709,6 +725,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
|
||||
|
||||
deadlineSet := false
|
||||
if client.bw.Available() < (len(mh) + len(msg) + len(CR_LF)) {
|
||||
atomic.AddInt64(&c.cache.wfc, 1)
|
||||
client.nc.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE))
|
||||
deadlineSet = true
|
||||
}
|
||||
@@ -879,8 +896,10 @@ func (c *client) processMsg(msg []byte) {
|
||||
qsubs := r.qsubs[i]
|
||||
index := c.cache.prand.Intn(len(qsubs))
|
||||
sub := qsubs[index]
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
if sub != nil {
|
||||
mh := c.msgHeader(msgh[:si], sub)
|
||||
c.deliverMsg(sub, mh, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ var defaultServerOptions = Options{
|
||||
|
||||
func rawSetup(serverOptions Options) (*Server, *client, *bufio.Reader, string) {
|
||||
cli, srv := net.Pipe()
|
||||
cr := bufio.NewReaderSize(cli, DEFAULT_BUF_SIZE)
|
||||
cr := bufio.NewReaderSize(cli, maxBufSize)
|
||||
s := New(&serverOptions)
|
||||
if serverOptions.Authorization != "" {
|
||||
s.SetClientAuthMethod(&mockAuth{})
|
||||
|
||||
@@ -82,8 +82,4 @@ const (
|
||||
|
||||
// MAX_PUB_ARGS Maximum possible number of arguments from PUB proto.
|
||||
MAX_PUB_ARGS = 3
|
||||
|
||||
// DEFAULT_BUF_SIZE is the default buffer size for reads and writes per connection.
|
||||
// It will be replaced by a dynamic system in the long run.
|
||||
DEFAULT_BUF_SIZE = 32768
|
||||
)
|
||||
|
||||
@@ -53,7 +53,6 @@ type Options struct {
|
||||
RemoteSyslog string `json:"-"`
|
||||
Routes []*url.URL `json:"-"`
|
||||
RoutesStr string `json:"-"`
|
||||
BufSize int `json:"-"`
|
||||
TLSTimeout float64 `json:"tls_timeout"`
|
||||
TLS bool `json:"-"`
|
||||
TLSVerify bool `json:"-"`
|
||||
@@ -572,7 +571,4 @@ func processOptions(opts *Options) {
|
||||
if opts.MaxPending == 0 {
|
||||
opts.MaxPending = MAX_PENDING_SIZE
|
||||
}
|
||||
if opts.BufSize == 0 {
|
||||
opts.BufSize = DEFAULT_BUF_SIZE
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@ func TestDefaultOptions(t *testing.T) {
|
||||
MaxPending: MAX_PENDING_SIZE,
|
||||
ClusterAuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second),
|
||||
ClusterTLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second),
|
||||
BufSize: DEFAULT_BUF_SIZE,
|
||||
}
|
||||
|
||||
opts := &Options{}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2013-2015 Apcera Inc. All rights reserved.
|
||||
// Copyright 2013-2016 Apcera Inc. All rights reserved.
|
||||
|
||||
package server
|
||||
|
||||
@@ -331,7 +331,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
|
||||
}
|
||||
|
||||
// Rewrap bw
|
||||
c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize)
|
||||
c.bw = bufio.NewWriterSize(c.nc, startBufSize)
|
||||
|
||||
// Do final client initialization
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2015 Apcera Inc. All rights reserved.
|
||||
// Copyright 2012-2016 Apcera Inc. All rights reserved.
|
||||
|
||||
package server
|
||||
|
||||
@@ -530,7 +530,7 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
c.mu.Lock()
|
||||
|
||||
// Rewrap bw
|
||||
c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize)
|
||||
c.bw = bufio.NewWriterSize(c.nc, startBufSize)
|
||||
|
||||
// Do final client initialization
|
||||
|
||||
|
||||
Reference in New Issue
Block a user