mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
@@ -473,6 +473,7 @@ func (c *client) flushOutbound() bool {
|
||||
// Subtract from pending bytes.
|
||||
c.out.pb -= n
|
||||
|
||||
// Check for partial writes
|
||||
if n != attempted && n > 0 {
|
||||
c.handlePartialWrite(nb)
|
||||
}
|
||||
@@ -496,22 +497,22 @@ func (c *client) flushOutbound() bool {
|
||||
|
||||
// Adjust sz as needed downward, keeping power of 2.
|
||||
// We do this at a slower rate, hence the pt*4.
|
||||
for pt*4 < c.out.sz && c.out.sz > minBufSize {
|
||||
if pt*4 < c.out.sz && c.out.sz > minBufSize {
|
||||
c.out.sz >>= 1
|
||||
}
|
||||
// Adjust sz as needed upward, keeping power of 2.
|
||||
for pt > c.out.sz && c.out.sz < maxBufSize {
|
||||
if pt > c.out.sz && c.out.sz < maxBufSize {
|
||||
c.out.sz <<= 1
|
||||
}
|
||||
|
||||
// Check to see if we can reuse buffers.
|
||||
if len(cnb) > 0 {
|
||||
oldp := cnb[0][:0]
|
||||
if cap(oldp) == c.out.sz || cap(oldp) == c.out.sz*2 {
|
||||
if cap(oldp) >= c.out.sz {
|
||||
// Replace primary or secondary if they are nil, reusing same buffer.
|
||||
if c.out.p == nil {
|
||||
c.out.p = oldp
|
||||
} else if c.out.s == nil {
|
||||
} else if c.out.s == nil || cap(c.out.s) < c.out.sz {
|
||||
c.out.s = oldp
|
||||
}
|
||||
}
|
||||
@@ -714,6 +715,7 @@ func (c *client) queueOutbound(data []byte) {
|
||||
c.out.p = c.out.s
|
||||
c.out.s = nil
|
||||
} else {
|
||||
// FIXME(dlc) - make power of 2 if less than maxBufSize?
|
||||
c.out.p = make([]byte, 0, c.out.sz)
|
||||
}
|
||||
}
|
||||
@@ -732,6 +734,10 @@ func (c *client) queueOutbound(data []byte) {
|
||||
} else {
|
||||
// We will copy to primary.
|
||||
if c.out.p == nil {
|
||||
// Grow here
|
||||
if (c.out.sz << 1) <= maxBufSize {
|
||||
c.out.sz <<= 1
|
||||
}
|
||||
if len(data) > c.out.sz {
|
||||
c.out.p = make([]byte, 0, len(data))
|
||||
} else {
|
||||
|
||||
@@ -57,7 +57,7 @@ const (
|
||||
MAX_PAYLOAD_SIZE = (1024 * 1024)
|
||||
|
||||
// MAX_PENDING_SIZE is the maximum outbound pending bytes per client.
|
||||
MAX_PENDING_SIZE = (10 * 1024 * 1024)
|
||||
MAX_PENDING_SIZE = (64 * 1024 * 1024)
|
||||
|
||||
// DEFAULT_MAX_CONNECTIONS is the default maximum connections allowed.
|
||||
DEFAULT_MAX_CONNECTIONS = (64 * 1024)
|
||||
|
||||
@@ -573,6 +573,7 @@ func doFanout(b *testing.B, numServers, numConnections, subsPerConnection int, s
|
||||
flushConnection(b, c)
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := bw.Write(sendOp)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user