Changes to prevent fan in scenarios from slow consumer state

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-01-10 15:56:04 -08:00
parent 6b2b4d9baf
commit e3d19ef698
3 changed files with 220 additions and 26 deletions

View File

@@ -198,13 +198,14 @@ type outbound struct {
s []byte // Secondary for use post flush
nb net.Buffers // net.Buffers for writev IO
sz int // limit size per []byte, uses variable BufSize constants, start, min, max.
sws int // Number of short writes, used for dyanmic resizing.
sws int // Number of short writes, used for dynamic resizing.
pb int64 // Total pending/queued bytes.
pm int64 // Total pending/queued messages.
sg *sync.Cond // Flusher conditional for signaling.
fsp int // Flush signals that are pending from readLoop's pcd.
mp int64 // snapshot of max pending.
wdl time.Duration // Snapshot fo write deadline.
mp int64 // snapshot of max pending.
fsp int // Flush signals that are pending from readLoop's pcd.
lws int64 // Last write size
lft time.Duration // Last flush time.
}
@@ -706,7 +707,7 @@ func (c *client) readLoop() {
// Budget to spend in place flushing outbound data.
// Client will be checked on several fronts to see
// if applicable. Routes will never wait in place.
budget := 500 * time.Microsecond
budget := time.Millisecond
if c.kind == ROUTER {
budget = 0
}
@@ -810,8 +811,18 @@ func (c *client) flushOutbound() bool {
attempted := c.out.pb
apm := c.out.pm
// Do NOT hold lock during actual IO
c.mu.Unlock()
// What we are doing here is seeing if we are getting behind. This is
// generally not a gradual thing and will spike quickly. Use some basic
// logic to try to understand when this is happening through no fault of
// our own. How we attempt to get back into a more balanced state under
// load will be to hold our lock during IO, forcing others to wait and
// applying back pressure to the publishers sending to us.
releaseLock := c.out.pb < maxBufSize*4
// Do NOT hold lock during actual IO unless we are behind
if releaseLock {
c.mu.Unlock()
}
// flush here
now := time.Now()
@@ -823,15 +834,18 @@ func (c *client) flushOutbound() bool {
nc.SetWriteDeadline(time.Time{})
lft := time.Since(now)
// Re-acquire client lock
c.mu.Lock()
// Re-acquire client lock if we let it go during IO
if releaseLock {
c.mu.Lock()
}
// Update flush time statistics
// Update flush time and size statistics
c.out.lft = lft
c.out.lws = n
// Subtract from pending bytes and messages.
c.out.pb -= n
c.out.pm -= apm // FIXME(dlc) - this will not be accurate.
c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate.
// Check for partial writes
if n != attempted && n > 0 {
@@ -860,7 +874,7 @@ func (c *client) flushOutbound() bool {
// Under some conditions, a client may hit a slow consumer write deadline
// before the authorization or TLS handshake timeout. If that is the case,
// then we handle as slow consumer though we do not increase the counter
// as can be misleading.
// as that can be misleading.
c.clearConnection(SlowConsumerWriteDeadline)
sce = false
}
@@ -904,6 +918,7 @@ func (c *client) flushOutbound() bool {
}
}
}
return true
}
@@ -1202,6 +1217,11 @@ func (c *client) maxPayloadViolation(sz int, max int32) {
// should not reuse the `data` array.
// Lock should be held.
func (c *client) queueOutbound(data []byte) bool {
// Do not keep going if closed or cleared via a slow consumer
if c.flags.isSet(clearConnection) {
return false
}
// Assume data will not be referenced
referenced := false
// Add to pending bytes total.
@@ -1957,6 +1977,9 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
client.out.pm++
// Check outbound threshold and queue IO flush if needed.
// This is specifically looking at situations where we are getting behind and may want
// to intervene before this producer goes back to top of readloop. We are in the producer's
// readloop go routine at this point.
if client.out.pm > 1 && client.out.pb > maxBufSize*2 {
client.flushSignal()
}

View File

@@ -68,7 +68,7 @@ const (
MAX_PAYLOAD_SIZE = (1024 * 1024)
// MAX_PENDING_SIZE is the maximum outbound pending bytes per client.
MAX_PENDING_SIZE = (256 * 1024 * 1024)
MAX_PENDING_SIZE = (64 * 1024 * 1024)
// DEFAULT_MAX_CONNECTIONS is the default maximum connections allowed.
DEFAULT_MAX_CONNECTIONS = (64 * 1024)

View File

@@ -11,6 +11,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Please note that these tests will stress a system and they need generous
// amounts of CPU, Memory and network sockets. Make sure the 'open files'
// setting for your platform is at least 8192. On linux and MacOSX you can
// do this via 'ulimit -n 8192'
//
package test
import (
@@ -132,7 +138,7 @@ func Benchmark______Pub8K_Payload(b *testing.B) {
benchPub(b, psub, s)
}
func Benchmark______Pub32K_Payload(b *testing.B) {
func Benchmark_____Pub32K_Payload(b *testing.B) {
b.StopTimer()
s := sizedString(32 * 1024)
benchPub(b, psub, s)
@@ -647,21 +653,23 @@ func doFanout(b *testing.B, numServers, numConnections, subsPerConnection int, s
// Publish Connection
c := createClientConn(b, o1.Host, o1.Port)
doDefaultConnect(b, c)
bw := bufio.NewWriterSize(c, defaultSendBufSize)
sendOp := []byte(fmt.Sprintf("PUB %s %d\r\n%s\r\n", subject, len(payload), payload))
flushConnection(b, c)
bw := bufio.NewWriterSize(c, defaultSendBufSize)
sendOp := []byte(fmt.Sprintf("PUB %s %d\r\n%s\r\n", subject, len(payload), payload))
b.SetBytes(int64(len(sendOp) + (len(msgOp) * numConnections * subsPerConnection)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := bw.Write(sendOp)
if err != nil {
b.Errorf("Received error on PUB write: %v\n", err)
b.Fatalf("Received error on PUB write: %v\n", err)
}
}
err := bw.Flush()
if err != nil {
b.Errorf("Received error on FLUSH write: %v\n", err)
b.Fatalf("Received error on FLUSH write: %v\n", err)
}
// Wait for connections to be drained
@@ -674,14 +682,6 @@ func doFanout(b *testing.B, numServers, numConnections, subsPerConnection int, s
var sub = "x"
var payload = "12345678"
func Benchmark___FanOut_512x1kx1k(b *testing.B) {
doFanout(b, 1, 1000, 1000, sub, sizedString(512))
}
func Benchmark__FanOut_8x1000x100(b *testing.B) {
doFanout(b, 1, 1000, 100, sub, payload)
}
func Benchmark______FanOut_8x1x10(b *testing.B) {
doFanout(b, 1, 1, 10, sub, payload)
}
@@ -710,10 +710,54 @@ func Benchmark__FanOut_8x10x10000(b *testing.B) {
doFanout(b, 1, 10, 10000, sub, payload)
}
func Benchmark__FanOut_1kx10x1000(b *testing.B) {
func Benchmark___FanOut_8x500x100(b *testing.B) {
doFanout(b, 1, 500, 100, sub, payload)
}
func Benchmark___FanOut_128x1x100(b *testing.B) {
doFanout(b, 1, 1, 100, sub, sizedString(128))
}
func Benchmark__FanOut_128x10x100(b *testing.B) {
doFanout(b, 1, 10, 100, sub, sizedString(128))
}
func Benchmark_FanOut_128x10x1000(b *testing.B) {
doFanout(b, 1, 10, 1000, sub, sizedString(128))
}
func Benchmark_FanOut_128x100x100(b *testing.B) {
doFanout(b, 1, 100, 100, sub, sizedString(128))
}
func BenchmarkFanOut_128x100x1000(b *testing.B) {
doFanout(b, 1, 100, 1000, sub, sizedString(128))
}
func BenchmarkFanOut_128x10x10000(b *testing.B) {
doFanout(b, 1, 10, 10000, sub, sizedString(128))
}
func BenchmarkFanOut__128x500x100(b *testing.B) {
doFanout(b, 1, 500, 100, sub, sizedString(128))
}
func Benchmark_FanOut_512x100x100(b *testing.B) {
doFanout(b, 1, 100, 100, sub, sizedString(512))
}
func Benchmark__FanOut_512x100x1k(b *testing.B) {
doFanout(b, 1, 100, 1000, sub, sizedString(512))
}
func Benchmark____FanOut_1kx10x1k(b *testing.B) {
doFanout(b, 1, 10, 1000, sub, sizedString(1024))
}
func Benchmark__FanOut_1kx100x100(b *testing.B) {
doFanout(b, 1, 100, 100, sub, sizedString(1024))
}
func Benchmark_____RFanOut_8x1x10(b *testing.B) {
doFanout(b, 2, 1, 10, sub, payload)
}
@@ -745,3 +789,130 @@ func Benchmark_RFanOut_8x10x10000(b *testing.B) {
func Benchmark_RFanOut_1kx10x1000(b *testing.B) {
doFanout(b, 2, 10, 1000, sub, sizedString(1024))
}
func doFanIn(b *testing.B, numServers, numPublishers, numSubscribers int, subject, payload string) {
b.Helper()
if b.N < numPublishers {
return
}
if numSubscribers > numPublishers {
b.Fatalf("Fan in tests should have numPublishers (%d) > numSubscribers (%d)", numPublishers, numSubscribers)
}
if numSubscribers > 10 {
b.Fatalf("numSubscribers should be <= 10")
}
if b.N%numPublishers != 0 {
b.Fatalf("b.N (%d) / numPublishers (%d) has a remainder", b.N, numPublishers)
}
var s1, s2 *server.Server
var o1, o2 *server.Options
switch numServers {
case 1:
s1, o1 = RunServerWithConfig("./configs/srv_a.conf")
defer s1.Shutdown()
s2, o2 = s1, o1
case 2:
s1, o1 = RunServerWithConfig("./configs/srv_a.conf")
defer s1.Shutdown()
s2, o2 = RunServerWithConfig("./configs/srv_b.conf")
defer s2.Shutdown()
default:
b.Fatalf("%d servers not supported for this test\n", numServers)
}
msgOp := fmt.Sprintf("MSG %s %d %d\r\n%s\r\n", subject, 9, len(payload), payload)
expected := len(msgOp) * b.N
// Client connections and subscriptions. For fan in these are smaller then numPublishers.
clients := make([]chan bool, 0, numSubscribers)
for i := 0; i < numSubscribers; i++ {
c := createClientConn(b, o2.Host, o2.Port)
doDefaultConnect(b, c)
defer c.Close()
ch := make(chan bool)
clients = append(clients, ch)
subOp := fmt.Sprintf("SUB %s %d\r\n", subject, i)
sendProto(b, c, subOp)
flushConnection(b, c)
go drainConnection(b, c, ch, expected)
}
sendOp := []byte(fmt.Sprintf("PUB %s %d\r\n%s\r\n", subject, len(payload), payload))
startCh := make(chan bool)
l := b.N / numPublishers
pubLoop := func(c net.Conn, ch chan bool) {
bw := bufio.NewWriterSize(c, defaultSendBufSize)
// Signal we are ready
close(ch)
// Wait to start up actual sends.
<-startCh
for i := 0; i < l; i++ {
_, err := bw.Write(sendOp)
if err != nil {
b.Errorf("Received error on PUB write: %v\n", err)
return
}
}
err := bw.Flush()
if err != nil {
b.Errorf("Received error on FLUSH write: %v\n", err)
return
}
}
// Publish Connections SPINUP
for i := 0; i < numPublishers; i++ {
c := createClientConn(b, o1.Host, o1.Port)
doDefaultConnect(b, c)
flushConnection(b, c)
ch := make(chan bool)
go pubLoop(c, ch)
<-ch
}
b.SetBytes(int64(len(sendOp) + len(msgOp)))
b.ResetTimer()
// Closing this will start all publishers at once (roughly)
close(startCh)
// Wait for connections to be drained
for i := 0; i < len(clients); i++ {
<-clients[i]
}
b.StopTimer()
}
func Benchmark_____FanIn_1kx100x1(b *testing.B) {
doFanIn(b, 1, 100, 1, sub, sizedString(1024))
}
func Benchmark_____FanIn_4kx100x1(b *testing.B) {
doFanIn(b, 1, 100, 1, sub, sizedString(4096))
}
func Benchmark_____FanIn_8kx100x1(b *testing.B) {
doFanIn(b, 1, 100, 1, sub, sizedString(8192))
}
func Benchmark____FanIn_16kx100x1(b *testing.B) {
doFanIn(b, 1, 100, 1, sub, sizedString(16384))
}
func Benchmark____FanIn_64kx100x1(b *testing.B) {
doFanIn(b, 1, 100, 1, sub, sizedString(65536))
}
func Benchmark___FanIn_128kx100x1(b *testing.B) {
doFanIn(b, 1, 100, 1, sub, sizedString(65536*2))
}