mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] MaxPending > MaxInt32 causes client to be disconnected
Changed some of client.outbound fields to int64. Moved fields around to minimize size of struct (checked with unsafe.Sizeof()) Checked benchmark results before/after Added test Resolves #1118 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -219,15 +219,15 @@ type outbound struct {
|
||||
nb net.Buffers // net.Buffers for writev IO
|
||||
sz int32 // limit size per []byte, uses variable BufSize constants, start, min, max.
|
||||
sws int32 // Number of short writes, used for dynamic resizing.
|
||||
pb int32 // Total pending/queued bytes.
|
||||
pb int64 // Total pending/queued bytes.
|
||||
pm int32 // Total pending/queued messages.
|
||||
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
|
||||
sg *sync.Cond // Flusher conditional for signaling to writeLoop.
|
||||
wdl time.Duration // Snapshot of write deadline.
|
||||
mp int32 // Snapshot of max pending for client.
|
||||
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
|
||||
mp int64 // Snapshot of max pending for client.
|
||||
lft time.Duration // Last flush time for Write.
|
||||
lwb int32 // Last byte size of Write.
|
||||
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
|
||||
lwb int32 // Last byte size of Write.
|
||||
sgw bool // Indicate flusher is waiting on condition wait.
|
||||
}
|
||||
|
||||
@@ -417,7 +417,7 @@ func (c *client) initClient() {
|
||||
opts := s.getOpts()
|
||||
// Snapshots to avoid mutex access in fast paths.
|
||||
c.out.wdl = opts.WriteDeadline
|
||||
c.out.mp = int32(opts.MaxPending)
|
||||
c.out.mp = opts.MaxPending
|
||||
|
||||
c.subs = make(map[string]*subscription)
|
||||
c.echo = true
|
||||
@@ -985,12 +985,12 @@ func (c *client) flushOutbound() bool {
|
||||
c.out.lwb = int32(n)
|
||||
|
||||
// Subtract from pending bytes and messages.
|
||||
c.out.pb -= c.out.lwb
|
||||
c.out.pb -= int64(c.out.lwb)
|
||||
c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate on partials.
|
||||
|
||||
// Check for partial writes
|
||||
// TODO(dlc) - zero write with no error will cause lost message and the writeloop to spin.
|
||||
if c.out.lwb != attempted && n > 0 {
|
||||
if int64(c.out.lwb) != attempted && n > 0 {
|
||||
c.handlePartialWrite(nb)
|
||||
} else if c.out.lwb >= c.out.sz {
|
||||
c.out.sws = 0
|
||||
@@ -1034,18 +1034,18 @@ func (c *client) flushOutbound() bool {
|
||||
}
|
||||
|
||||
// Adjust based on what we wrote plus any pending.
|
||||
pt := c.out.lwb + c.out.pb
|
||||
pt := int64(c.out.lwb) + c.out.pb
|
||||
|
||||
// Adjust sz as needed downward, keeping power of 2.
|
||||
// We do this at a slower rate.
|
||||
if pt < c.out.sz && c.out.sz > minBufSize {
|
||||
if pt < int64(c.out.sz) && c.out.sz > minBufSize {
|
||||
c.out.sws++
|
||||
if c.out.sws > shortsToShrink {
|
||||
c.out.sz >>= 1
|
||||
}
|
||||
}
|
||||
// Adjust sz as needed upward, keeping power of 2.
|
||||
if pt > c.out.sz && c.out.sz < maxBufSize {
|
||||
if pt > int64(c.out.sz) && c.out.sz < maxBufSize {
|
||||
c.out.sz <<= 1
|
||||
}
|
||||
|
||||
@@ -1070,7 +1070,7 @@ func (c *client) flushOutbound() bool {
|
||||
|
||||
// Check if we have a stalled gate and if so and we are recovering release
|
||||
// any stalled producers. Only kind==CLIENT will stall.
|
||||
if c.out.stc != nil && (c.out.lwb == attempted || c.out.pb < c.out.mp/2) {
|
||||
if c.out.stc != nil && (int64(c.out.lwb) == attempted || c.out.pb < c.out.mp/2) {
|
||||
close(c.out.stc)
|
||||
c.out.stc = nil
|
||||
}
|
||||
@@ -1406,7 +1406,7 @@ func (c *client) queueOutbound(data []byte) bool {
|
||||
// Assume data will not be referenced
|
||||
referenced := false
|
||||
// Add to pending bytes total.
|
||||
c.out.pb += int32(len(data))
|
||||
c.out.pb += int64(len(data))
|
||||
|
||||
// Check for slow consumer via pending bytes limit.
|
||||
// ok to return here, client is going away.
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
"reflect"
|
||||
"regexp"
|
||||
@@ -1348,3 +1349,17 @@ func TestTraceMsg(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientMaxPending(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.MaxPending = math.MaxInt32 + 1
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
||||
defer nc.Close()
|
||||
|
||||
sub := natsSubSync(t, nc, "foo")
|
||||
natsPub(t, nc, "foo", []byte("msg"))
|
||||
natsNexMsg(t, sub, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user