mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Some Optimizations
1. Change outbound client structure to be smaller and more cache friendly. 2. Snapshot MaxControlLine into client structure (mcl) to avoid server opts lookup. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -146,6 +146,7 @@ type client struct {
|
||||
stats
|
||||
mpay int32
|
||||
msubs int
|
||||
mcl int
|
||||
mu sync.Mutex
|
||||
kind int
|
||||
cid uint64
|
||||
@@ -199,14 +200,14 @@ type outbound struct {
|
||||
nb net.Buffers // net.Buffers for writev IO
|
||||
sz int // limit size per []byte, uses variable BufSize constants, start, min, max.
|
||||
sws int // Number of short writes, used for dynamic resizing.
|
||||
pb int64 // Total pending/queued bytes.
|
||||
pm int64 // Total pending/queued messages.
|
||||
pb int // Total pending/queued bytes.
|
||||
pm int // Total pending/queued messages.
|
||||
sg *sync.Cond // Flusher conditional for signaling.
|
||||
sgw bool // Indicate flusher is waiting on condition wait.
|
||||
wdl time.Duration // Snapshot fo write deadline.
|
||||
mp int64 // snapshot of max pending.
|
||||
mp int // snapshot of max pending.
|
||||
fsp int // Flush signals that are pending from readLoop's pcd.
|
||||
lft time.Duration // Last flush time.
|
||||
sgw bool // Indicate flusher is waiting on condition wait.
|
||||
}
|
||||
|
||||
type perm struct {
|
||||
@@ -353,7 +354,7 @@ func (c *client) initClient() {
|
||||
opts := s.getOpts()
|
||||
// Snapshots to avoid mutex access in fast paths.
|
||||
c.out.wdl = opts.WriteDeadline
|
||||
c.out.mp = opts.MaxPending
|
||||
c.out.mp = int(opts.MaxPending)
|
||||
|
||||
c.subs = make(map[string]*subscription)
|
||||
c.echo = true
|
||||
@@ -656,6 +657,15 @@ func (c *client) readLoop() {
|
||||
nc := c.nc
|
||||
s := c.srv
|
||||
c.in.rsz = startBufSize
|
||||
// Snapshot max control line since currently can not be changed on reload and we
|
||||
// were checking it on each call to parse. If this changes and we allow MaxControlLine
|
||||
// to be reloaded without restart, this code will need to change.
|
||||
c.mcl = MAX_CONTROL_LINE_SIZE
|
||||
if s != nil {
|
||||
if opts := s.getOpts(); opts != nil {
|
||||
c.mcl = opts.MaxControlLine
|
||||
}
|
||||
}
|
||||
defer s.grWG.Done()
|
||||
c.mu.Unlock()
|
||||
|
||||
@@ -845,11 +855,11 @@ func (c *client) flushOutbound() bool {
|
||||
c.out.lft = lft
|
||||
|
||||
// Subtract from pending bytes and messages.
|
||||
c.out.pb -= n
|
||||
c.out.pb -= int(n)
|
||||
c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate.
|
||||
|
||||
// Check for partial writes
|
||||
if n != attempted && n > 0 {
|
||||
if n != int64(attempted) && n > 0 {
|
||||
c.handlePartialWrite(nb)
|
||||
} else if n >= int64(c.out.sz) {
|
||||
c.out.sws = 0
|
||||
@@ -892,10 +902,10 @@ func (c *client) flushOutbound() bool {
|
||||
}
|
||||
|
||||
// Adjust based on what we wrote plus any pending.
|
||||
pt := int(n + c.out.pb)
|
||||
pt := int(n) + c.out.pb
|
||||
|
||||
// Adjust sz as needed downward, keeping power of 2.
|
||||
// We do this at a slower rate, hence the pt*4.
|
||||
// We do this at a slower rate.
|
||||
if pt < c.out.sz && c.out.sz > minBufSize {
|
||||
c.out.sws++
|
||||
if c.out.sws > shortsToShrink {
|
||||
@@ -1234,7 +1244,7 @@ func (c *client) queueOutbound(data []byte) bool {
|
||||
// Assume data will not be referenced
|
||||
referenced := false
|
||||
// Add to pending bytes total.
|
||||
c.out.pb += int64(len(data))
|
||||
c.out.pb += len(data)
|
||||
|
||||
// Check for slow consumer via pending bytes limit.
|
||||
// ok to return here, client is going away.
|
||||
@@ -2540,11 +2550,9 @@ func (c *client) clearAuthTimer() bool {
|
||||
|
||||
// We may reuse atmr for expiring user jwts,
|
||||
// so check connectReceived.
|
||||
// Lock assume held on entry.
|
||||
func (c *client) awaitingAuth() bool {
|
||||
c.mu.Lock()
|
||||
authSet := !c.flags.isSet(connectReceived) && c.atmr != nil
|
||||
c.mu.Unlock()
|
||||
return authSet
|
||||
return !c.flags.isSet(connectReceived) && c.atmr != nil
|
||||
}
|
||||
|
||||
// This will set the atmr for the JWT expiration time.
|
||||
|
||||
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.0.0-RC2"
|
||||
VERSION = "2.0.0-RC3"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -109,17 +109,14 @@ func (c *client) parse(buf []byte) error {
|
||||
var i int
|
||||
var b byte
|
||||
|
||||
// FIXME(dlc) - This is wasteful, only can change on reload.
|
||||
mcl := MAX_CONTROL_LINE_SIZE
|
||||
if c.srv != nil {
|
||||
if opts := c.srv.getOpts(); opts != nil {
|
||||
mcl = opts.MaxControlLine
|
||||
}
|
||||
}
|
||||
|
||||
// Snapshot this, and reset when we receive a
|
||||
// Snapshots
|
||||
c.mu.Lock()
|
||||
// Snapshot and then reset when we receive a
|
||||
// proper CONNECT if needed.
|
||||
authSet := c.awaitingAuth()
|
||||
// Snapshot max control line as well.
|
||||
mcl := c.mcl
|
||||
c.mu.Unlock()
|
||||
|
||||
// Move to loop instead of range syntax to allow jumping of i
|
||||
for i = 0; i < len(buf); i++ {
|
||||
@@ -606,7 +603,9 @@ func (c *client) parse(buf []byte) error {
|
||||
}
|
||||
c.drop, c.state = 0, OP_START
|
||||
// Reset notion on authSet
|
||||
c.mu.Lock()
|
||||
authSet = c.awaitingAuth()
|
||||
c.mu.Unlock()
|
||||
default:
|
||||
if c.argBuf != nil {
|
||||
c.argBuf = append(c.argBuf, b)
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
)
|
||||
|
||||
func dummyClient() *client {
|
||||
return &client{srv: New(&defaultServerOptions), msubs: -1, mpay: -1}
|
||||
return &client{srv: New(&defaultServerOptions), msubs: -1, mpay: -1, mcl: MAX_CONTROL_LINE_SIZE}
|
||||
}
|
||||
|
||||
func dummyRouteClient() *client {
|
||||
@@ -578,7 +578,7 @@ func TestParseOK(t *testing.T) {
|
||||
|
||||
func TestMaxControlLine(t *testing.T) {
|
||||
c := dummyClient()
|
||||
c.srv.opts.MaxControlLine = 8
|
||||
c.mcl = 8
|
||||
|
||||
pub := []byte("PUB foo.bar 11\r")
|
||||
err := c.parse(pub)
|
||||
|
||||
@@ -30,7 +30,7 @@ func TestSplitBufferSubOp(t *testing.T) {
|
||||
}
|
||||
s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: gws}
|
||||
s.registerAccount(s.gacc)
|
||||
c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription), nc: cli}
|
||||
c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription), nc: cli}
|
||||
|
||||
subop := []byte("SUB foo 1\r\n")
|
||||
subop1 := subop[:6]
|
||||
@@ -67,7 +67,7 @@ func TestSplitBufferSubOp(t *testing.T) {
|
||||
func TestSplitBufferUnsubOp(t *testing.T) {
|
||||
s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: &srvGateway{}}
|
||||
s.registerAccount(s.gacc)
|
||||
c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription)}
|
||||
c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)}
|
||||
|
||||
subop := []byte("SUB foo 1024\r\n")
|
||||
if err := c.parse(subop); err != nil {
|
||||
@@ -100,7 +100,7 @@ func TestSplitBufferUnsubOp(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSplitBufferPubOp(t *testing.T) {
|
||||
c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)}
|
||||
c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)}
|
||||
pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r")
|
||||
pub1 := pub[:2]
|
||||
pub2 := pub[2:9]
|
||||
@@ -166,7 +166,7 @@ func TestSplitBufferPubOp(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSplitBufferPubOp2(t *testing.T) {
|
||||
c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)}
|
||||
c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)}
|
||||
pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r\n")
|
||||
pub1 := pub[:30]
|
||||
pub2 := pub[30:]
|
||||
@@ -186,7 +186,7 @@ func TestSplitBufferPubOp2(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSplitBufferPubOp3(t *testing.T) {
|
||||
c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)}
|
||||
c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)}
|
||||
pubAll := []byte("PUB foo bar 11\r\nhello world\r\n")
|
||||
pub := pubAll[:16]
|
||||
|
||||
@@ -212,7 +212,7 @@ func TestSplitBufferPubOp3(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSplitBufferPubOp4(t *testing.T) {
|
||||
c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)}
|
||||
c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)}
|
||||
pubAll := []byte("PUB foo 11\r\nhello world\r\n")
|
||||
pub := pubAll[:12]
|
||||
|
||||
@@ -238,7 +238,7 @@ func TestSplitBufferPubOp4(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSplitBufferPubOp5(t *testing.T) {
|
||||
c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)}
|
||||
c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)}
|
||||
pubAll := []byte("PUB foo 11\r\nhello world\r\n")
|
||||
|
||||
// Splits need to be on MSG_END_R now too, so make sure we check that.
|
||||
@@ -257,7 +257,7 @@ func TestSplitBufferPubOp5(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSplitConnectArg(t *testing.T) {
|
||||
c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)}
|
||||
c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)}
|
||||
connectAll := []byte("CONNECT {\"verbose\":false,\"tls_required\":false," +
|
||||
"\"user\":\"test\",\"pedantic\":true,\"pass\":\"pass\"}\r\n")
|
||||
|
||||
@@ -306,7 +306,7 @@ func TestSplitConnectArg(t *testing.T) {
|
||||
|
||||
func TestSplitDanglingArgBuf(t *testing.T) {
|
||||
s := New(&defaultServerOptions)
|
||||
c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription)}
|
||||
c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)}
|
||||
|
||||
// We test to make sure we do not dangle any argBufs after processing
|
||||
// since that could lead to performance issues.
|
||||
@@ -445,7 +445,7 @@ func TestSplitRoutedMsgArg(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSplitBufferMsgOp(t *testing.T) {
|
||||
c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription), kind: ROUTER}
|
||||
c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription), kind: ROUTER}
|
||||
msg := []byte("RMSG $G foo.bar _INBOX.22 11\r\nhello world\r")
|
||||
msg1 := msg[:2]
|
||||
msg2 := msg[2:9]
|
||||
|
||||
Reference in New Issue
Block a user