mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
New fan-in logic
Reworked fan in logic. We do not hold locks during IO, either read or write. On scenarios where we can get behind mostly due to fan-in from fast producers we detect and create a stall channel. Once we catch up we close the stall channel to release all blocked producers. Producers have an upper bound on how long they will be stalled. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
118
server/client.go
118
server/client.go
@@ -67,7 +67,6 @@ const (
|
||||
startBufSize = 512 // For INFO/CONNECT block
|
||||
minBufSize = 64 // Smallest to shrink to for PING/PONG
|
||||
maxBufSize = 65536 // 64k
|
||||
backedUpSize = maxBufSize * 4
|
||||
shortsToShrink = 2
|
||||
)
|
||||
|
||||
@@ -207,8 +206,9 @@ type outbound struct {
|
||||
wdl time.Duration // Snapshot of write deadline.
|
||||
mp int32 // Snapshot of max pending for client.
|
||||
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
|
||||
lft time.Duration // Last flush time.
|
||||
lwb int32 // Last byte size of Write().
|
||||
lft time.Duration // Last flush time for Write.
|
||||
lwb int32 // Last byte size of Write.
|
||||
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
|
||||
sgw bool // Indicate flusher is waiting on condition wait.
|
||||
}
|
||||
|
||||
@@ -266,7 +266,6 @@ type readCache struct {
|
||||
msgs int32
|
||||
bytes int32
|
||||
subs int32
|
||||
psd time.Duration // Possible stall duration for fast producers.
|
||||
|
||||
rsz int32 // Read buffer size
|
||||
srs int32 // Short reads, used for dynamic buffer resizing.
|
||||
@@ -656,50 +655,6 @@ func (c *client) writeLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
// Will determine if a client connection is considered behind due to outbound buffering. This will
|
||||
// allow us to react and try to stabilize bursty fast producers or fan in scenarios.
|
||||
// Assume lock is held.
|
||||
func (c *client) outboundBackedUp() bool {
|
||||
return c.out.pb > backedUpSize && c.out.pm > 1
|
||||
}
|
||||
|
||||
// Given a raw estimated delay we will box with a floor and ceiling.
|
||||
func (c *client) boxedDelay(psd time.Duration) time.Duration {
|
||||
// Ceiling at 250ms.
|
||||
if psd > 250*time.Millisecond {
|
||||
c.Debugf("Stall delay of %v reset to max of %v", psd, 250*time.Millisecond)
|
||||
psd = 250 * time.Millisecond
|
||||
}
|
||||
return psd
|
||||
}
|
||||
|
||||
// This determines how much of the pending backlog for this client
|
||||
// we should consider to be responsible for in our fast producer
|
||||
// stall delay.
|
||||
// Assume lock is held.
|
||||
func (c *client) percentBacklog() int32 {
|
||||
// If behind 50.0% of our max, wait for 100%.
|
||||
// If behind 12.5% of our max, wait for 33%
|
||||
// If behind 6.25% of our max, wait for 25%
|
||||
// Otherwise wait 20%
|
||||
// We also note how many others are writing.
|
||||
// This will be at least one, but we dampen by half
|
||||
fsp := c.out.fsp / 2
|
||||
if fsp == 0 {
|
||||
fsp = 1
|
||||
}
|
||||
switch {
|
||||
case c.out.pb > c.out.mp/2:
|
||||
return c.out.pb / 1 / fsp
|
||||
case c.out.pb > c.out.mp/8:
|
||||
return c.out.pb / 3 / fsp
|
||||
case c.out.pb > c.out.mp/16:
|
||||
return c.out.pb / 4 / fsp
|
||||
default:
|
||||
return c.out.pb / 5 / fsp
|
||||
}
|
||||
}
|
||||
|
||||
// flushClients will make sure to flush any clients we may have
|
||||
// sent to during processing. We pass in a budget as a time.Duration
|
||||
// for how much time to spend in place flushing for this client. This
|
||||
@@ -725,18 +680,6 @@ func (c *client) flushClients(budget time.Duration) time.Time {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this client is backed up and if so calculate any delay intention.
|
||||
// We only delay clients, not routes, gateways etc.
|
||||
if c.kind == CLIENT && cp.outboundBackedUp() {
|
||||
td := cp.percentBacklog()
|
||||
delay := time.Duration(td) * cp.out.lft / time.Duration(cp.out.lwb)
|
||||
c.in.psd += delay
|
||||
// Attempt to flush in place if we can. If we do subtract off of our potential stall delay.
|
||||
if cp.flushOutbound() {
|
||||
c.in.psd -= cp.out.lft
|
||||
}
|
||||
}
|
||||
|
||||
if budget > 0 && cp.flushOutbound() {
|
||||
budget -= cp.out.lft
|
||||
} else {
|
||||
@@ -793,9 +736,6 @@ func (c *client) readLoop() {
|
||||
c.in.bytes = 0
|
||||
c.in.subs = 0
|
||||
|
||||
// Clear possible stall delay factor.
|
||||
c.in.psd = 0
|
||||
|
||||
// Main call into parser for inbound data. This will generate callouts
|
||||
// to process messages, etc.
|
||||
if err := c.parse(b[:n]); err != nil {
|
||||
@@ -828,16 +768,6 @@ func (c *client) readLoop() {
|
||||
// Flush, or signal to writeLoop to flush to socket.
|
||||
last := c.flushClients(budget)
|
||||
|
||||
// If we have detected clients that we are sending to are behind let's
|
||||
// try to throttle a bit and see if they catch up. This allows servers to
|
||||
// survive bursts and high fan-in scenarios. We do not hold our lock here
|
||||
// so outbound is free to continue.
|
||||
if psd := c.in.psd; psd > 25*time.Microsecond {
|
||||
psd = c.boxedDelay(psd)
|
||||
c.Debugf("Delaying %v due to fast producer", psd)
|
||||
time.Sleep(psd)
|
||||
}
|
||||
|
||||
// Update activity, check read buffer size.
|
||||
c.mu.Lock()
|
||||
nc := c.nc
|
||||
@@ -1025,6 +955,13 @@ func (c *client) flushOutbound() bool {
|
||||
c.flushSignal()
|
||||
}
|
||||
|
||||
// Check if we have a stalled gate and if so and we are recovering release
|
||||
// any stalled producers. Only kind==CLIENT will stall.
|
||||
if c.out.stc != nil && (c.out.lwb == attempted || c.out.pb < c.out.mp/2) {
|
||||
close(c.out.stc)
|
||||
c.out.stc = nil
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -1401,6 +1338,14 @@ func (c *client) queueOutbound(data []byte) bool {
|
||||
} else {
|
||||
c.out.p = append(c.out.p, data...)
|
||||
}
|
||||
|
||||
// Check here if we should create a stall channel if we are falling behind.
|
||||
// We do this here since if we wait for consumer's writeLoop it could be
|
||||
// too late with large number of fan in producers.
|
||||
if c.out.pb > c.out.mp/2 && c.out.stc == nil {
|
||||
c.out.stc = make(chan struct{})
|
||||
}
|
||||
|
||||
return referenced
|
||||
}
|
||||
|
||||
@@ -1996,6 +1941,19 @@ func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte {
|
||||
return mh
|
||||
}
|
||||
|
||||
func (c *client) stalledWait(producer *client) {
|
||||
stall := c.out.stc
|
||||
c.mu.Unlock()
|
||||
defer c.mu.Lock()
|
||||
|
||||
// TODO(dlc) - Make the stall timeout variable based on health of consumer.
|
||||
select {
|
||||
case <-stall:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
producer.Debugf("Timed out of fast producer stall")
|
||||
}
|
||||
}
|
||||
|
||||
// Used to treat maps as efficient set
|
||||
var needFlush = struct{}{}
|
||||
|
||||
@@ -2075,6 +2033,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// If we are a client and we detect that the consumer we are
|
||||
// sending to is in a stalled state, go ahead and wait here
|
||||
// with a limit.
|
||||
if c.kind == CLIENT && client.out.stc != nil {
|
||||
client.stalledWait(c)
|
||||
}
|
||||
|
||||
// Check for closed connection
|
||||
if client.flags.isSet(clearConnection) {
|
||||
client.mu.Unlock()
|
||||
@@ -2668,6 +2633,13 @@ func (c *client) clearConnection(reason ClosedState) {
|
||||
if nc == nil || srv == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Unblock anyone who is potentially stalled waiting on us.
|
||||
if c.out.stc != nil {
|
||||
close(c.out.stc)
|
||||
c.out.stc = nil
|
||||
}
|
||||
|
||||
// Flush any pending.
|
||||
c.flushOutbound()
|
||||
|
||||
|
||||
@@ -68,7 +68,7 @@ const (
|
||||
MAX_PAYLOAD_SIZE = (1024 * 1024)
|
||||
|
||||
// MAX_PENDING_SIZE is the maximum outbound pending bytes per client.
|
||||
MAX_PENDING_SIZE = (128 * 1024 * 1024)
|
||||
MAX_PENDING_SIZE = (64 * 1024 * 1024)
|
||||
|
||||
// DEFAULT_MAX_CONNECTIONS is the default maximum connections allowed.
|
||||
DEFAULT_MAX_CONNECTIONS = (64 * 1024)
|
||||
|
||||
Reference in New Issue
Block a user