From b8e7b9b68e13a381ffdaa6916a405ccd4a173783 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 4 Feb 2019 17:07:49 -0800 Subject: [PATCH] Some Optimizations 1. Change outbound client structure to be smaller and more cache friendly. 2. Snapshot MaxControlLine into client structure (mcl) to avoid server opts lookup. Signed-off-by: Derek Collison --- server/client.go | 36 ++++++++++++++++++++++-------------- server/const.go | 2 +- server/parser.go | 17 ++++++++--------- server/parser_test.go | 4 ++-- server/split_test.go | 20 ++++++++++---------- 5 files changed, 43 insertions(+), 36 deletions(-) diff --git a/server/client.go b/server/client.go index fa57ebc5..7b0fa52b 100644 --- a/server/client.go +++ b/server/client.go @@ -146,6 +146,7 @@ type client struct { stats mpay int32 msubs int + mcl int mu sync.Mutex kind int cid uint64 @@ -199,14 +200,14 @@ type outbound struct { nb net.Buffers // net.Buffers for writev IO sz int // limit size per []byte, uses variable BufSize constants, start, min, max. sws int // Number of short writes, used for dynamic resizing. - pb int64 // Total pending/queued bytes. - pm int64 // Total pending/queued messages. + pb int // Total pending/queued bytes. + pm int // Total pending/queued messages. sg *sync.Cond // Flusher conditional for signaling. - sgw bool // Indicate flusher is waiting on condition wait. wdl time.Duration // Snapshot fo write deadline. - mp int64 // snapshot of max pending. + mp int // snapshot of max pending. fsp int // Flush signals that are pending from readLoop's pcd. lft time.Duration // Last flush time. + sgw bool // Indicate flusher is waiting on condition wait. } type perm struct { @@ -353,7 +354,7 @@ func (c *client) initClient() { opts := s.getOpts() // Snapshots to avoid mutex access in fast paths. c.out.wdl = opts.WriteDeadline - c.out.mp = opts.MaxPending + c.out.mp = int(opts.MaxPending) c.subs = make(map[string]*subscription) c.echo = true @@ -656,6 +657,15 @@ func (c *client) readLoop() { nc := c.nc s := c.srv c.in.rsz = startBufSize + // Snapshot max control line since currently can not be changed on reload and we + // were checking it on each call to parse. If this changes and we allow MaxControlLine + // to be reloaded without restart, this code will need to change. + c.mcl = MAX_CONTROL_LINE_SIZE + if s != nil { + if opts := s.getOpts(); opts != nil { + c.mcl = opts.MaxControlLine + } + } defer s.grWG.Done() c.mu.Unlock() @@ -845,11 +855,11 @@ func (c *client) flushOutbound() bool { c.out.lft = lft // Subtract from pending bytes and messages. - c.out.pb -= n + c.out.pb -= int(n) c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate. // Check for partial writes - if n != attempted && n > 0 { + if n != int64(attempted) && n > 0 { c.handlePartialWrite(nb) } else if n >= int64(c.out.sz) { c.out.sws = 0 @@ -892,10 +902,10 @@ func (c *client) flushOutbound() bool { } // Adjust based on what we wrote plus any pending. - pt := int(n + c.out.pb) + pt := int(n) + c.out.pb // Adjust sz as needed downward, keeping power of 2. - // We do this at a slower rate, hence the pt*4. + // We do this at a slower rate. if pt < c.out.sz && c.out.sz > minBufSize { c.out.sws++ if c.out.sws > shortsToShrink { @@ -1234,7 +1244,7 @@ func (c *client) queueOutbound(data []byte) bool { // Assume data will not be referenced referenced := false // Add to pending bytes total. - c.out.pb += int64(len(data)) + c.out.pb += len(data) // Check for slow consumer via pending bytes limit. // ok to return here, client is going away. @@ -2540,11 +2550,9 @@ func (c *client) clearAuthTimer() bool { // We may reuse atmr for expiring user jwts, // so check connectReceived. +// Lock assume held on entry. func (c *client) awaitingAuth() bool { - c.mu.Lock() - authSet := !c.flags.isSet(connectReceived) && c.atmr != nil - c.mu.Unlock() - return authSet + return !c.flags.isSet(connectReceived) && c.atmr != nil } // This will set the atmr for the JWT expiration time. diff --git a/server/const.go b/server/const.go index 535c2067..10524d24 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.0.0-RC2" + VERSION = "2.0.0-RC3" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/parser.go b/server/parser.go index e4a28e1d..2cb3d9f1 100644 --- a/server/parser.go +++ b/server/parser.go @@ -109,17 +109,14 @@ func (c *client) parse(buf []byte) error { var i int var b byte - // FIXME(dlc) - This is wasteful, only can change on reload. - mcl := MAX_CONTROL_LINE_SIZE - if c.srv != nil { - if opts := c.srv.getOpts(); opts != nil { - mcl = opts.MaxControlLine - } - } - - // Snapshot this, and reset when we receive a + // Snapshots + c.mu.Lock() + // Snapshot and then reset when we receive a // proper CONNECT if needed. authSet := c.awaitingAuth() + // Snapshot max control line as well. + mcl := c.mcl + c.mu.Unlock() // Move to loop instead of range syntax to allow jumping of i for i = 0; i < len(buf); i++ { @@ -606,7 +603,9 @@ func (c *client) parse(buf []byte) error { } c.drop, c.state = 0, OP_START // Reset notion on authSet + c.mu.Lock() authSet = c.awaitingAuth() + c.mu.Unlock() default: if c.argBuf != nil { c.argBuf = append(c.argBuf, b) diff --git a/server/parser_test.go b/server/parser_test.go index 4748d927..caf930ab 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -19,7 +19,7 @@ import ( ) func dummyClient() *client { - return &client{srv: New(&defaultServerOptions), msubs: -1, mpay: -1} + return &client{srv: New(&defaultServerOptions), msubs: -1, mpay: -1, mcl: MAX_CONTROL_LINE_SIZE} } func dummyRouteClient() *client { @@ -578,7 +578,7 @@ func TestParseOK(t *testing.T) { func TestMaxControlLine(t *testing.T) { c := dummyClient() - c.srv.opts.MaxControlLine = 8 + c.mcl = 8 pub := []byte("PUB foo.bar 11\r") err := c.parse(pub) diff --git a/server/split_test.go b/server/split_test.go index 0459c383..7c901646 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -30,7 +30,7 @@ func TestSplitBufferSubOp(t *testing.T) { } s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: gws} s.registerAccount(s.gacc) - c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription), nc: cli} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription), nc: cli} subop := []byte("SUB foo 1\r\n") subop1 := subop[:6] @@ -67,7 +67,7 @@ func TestSplitBufferSubOp(t *testing.T) { func TestSplitBufferUnsubOp(t *testing.T) { s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: &srvGateway{}} s.registerAccount(s.gacc) - c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} subop := []byte("SUB foo 1024\r\n") if err := c.parse(subop); err != nil { @@ -100,7 +100,7 @@ func TestSplitBufferUnsubOp(t *testing.T) { } func TestSplitBufferPubOp(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r") pub1 := pub[:2] pub2 := pub[2:9] @@ -166,7 +166,7 @@ func TestSplitBufferPubOp(t *testing.T) { } func TestSplitBufferPubOp2(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r\n") pub1 := pub[:30] pub2 := pub[30:] @@ -186,7 +186,7 @@ func TestSplitBufferPubOp2(t *testing.T) { } func TestSplitBufferPubOp3(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo bar 11\r\nhello world\r\n") pub := pubAll[:16] @@ -212,7 +212,7 @@ func TestSplitBufferPubOp3(t *testing.T) { } func TestSplitBufferPubOp4(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo 11\r\nhello world\r\n") pub := pubAll[:12] @@ -238,7 +238,7 @@ func TestSplitBufferPubOp4(t *testing.T) { } func TestSplitBufferPubOp5(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo 11\r\nhello world\r\n") // Splits need to be on MSG_END_R now too, so make sure we check that. @@ -257,7 +257,7 @@ func TestSplitBufferPubOp5(t *testing.T) { } func TestSplitConnectArg(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} connectAll := []byte("CONNECT {\"verbose\":false,\"tls_required\":false," + "\"user\":\"test\",\"pedantic\":true,\"pass\":\"pass\"}\r\n") @@ -306,7 +306,7 @@ func TestSplitConnectArg(t *testing.T) { func TestSplitDanglingArgBuf(t *testing.T) { s := New(&defaultServerOptions) - c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} // We test to make sure we do not dangle any argBufs after processing // since that could lead to performance issues. @@ -445,7 +445,7 @@ func TestSplitRoutedMsgArg(t *testing.T) { } func TestSplitBufferMsgOp(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription), kind: ROUTER} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription), kind: ROUTER} msg := []byte("RMSG $G foo.bar _INBOX.22 11\r\nhello world\r") msg1 := msg[:2] msg2 := msg[2:9]