From effa30ce4af782406e91eb7c2df5779fecc69a53 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 11 Sep 2019 13:05:10 -0600 Subject: [PATCH] [FIXED] MaxPending > MaxInt32 causes client to be disconnected Changed some of client.outbound fields to int64. Moved fields around to minimize size of struct (checked with unsafe.Sizeof()) Checked benchmark results before/after Added test Resolves #1118 Signed-off-by: Ivan Kozlovic --- server/client.go | 24 ++++++++++++------------ server/client_test.go | 15 +++++++++++++++ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/server/client.go b/server/client.go index 0681bb01..3ec26f5c 100644 --- a/server/client.go +++ b/server/client.go @@ -219,15 +219,15 @@ type outbound struct { nb net.Buffers // net.Buffers for writev IO sz int32 // limit size per []byte, uses variable BufSize constants, start, min, max. sws int32 // Number of short writes, used for dynamic resizing. - pb int32 // Total pending/queued bytes. + pb int64 // Total pending/queued bytes. pm int32 // Total pending/queued messages. + fsp int32 // Flush signals that are pending per producer from readLoop's pcd. sg *sync.Cond // Flusher conditional for signaling to writeLoop. wdl time.Duration // Snapshot of write deadline. - mp int32 // Snapshot of max pending for client. - fsp int32 // Flush signals that are pending per producer from readLoop's pcd. + mp int64 // Snapshot of max pending for client. lft time.Duration // Last flush time for Write. - lwb int32 // Last byte size of Write. stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in. + lwb int32 // Last byte size of Write. sgw bool // Indicate flusher is waiting on condition wait. } @@ -417,7 +417,7 @@ func (c *client) initClient() { opts := s.getOpts() // Snapshots to avoid mutex access in fast paths. c.out.wdl = opts.WriteDeadline - c.out.mp = int32(opts.MaxPending) + c.out.mp = opts.MaxPending c.subs = make(map[string]*subscription) c.echo = true @@ -985,12 +985,12 @@ func (c *client) flushOutbound() bool { c.out.lwb = int32(n) // Subtract from pending bytes and messages. - c.out.pb -= c.out.lwb + c.out.pb -= int64(c.out.lwb) c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate on partials. // Check for partial writes // TODO(dlc) - zero write with no error will cause lost message and the writeloop to spin. - if c.out.lwb != attempted && n > 0 { + if int64(c.out.lwb) != attempted && n > 0 { c.handlePartialWrite(nb) } else if c.out.lwb >= c.out.sz { c.out.sws = 0 @@ -1034,18 +1034,18 @@ func (c *client) flushOutbound() bool { } // Adjust based on what we wrote plus any pending. - pt := c.out.lwb + c.out.pb + pt := int64(c.out.lwb) + c.out.pb // Adjust sz as needed downward, keeping power of 2. // We do this at a slower rate. - if pt < c.out.sz && c.out.sz > minBufSize { + if pt < int64(c.out.sz) && c.out.sz > minBufSize { c.out.sws++ if c.out.sws > shortsToShrink { c.out.sz >>= 1 } } // Adjust sz as needed upward, keeping power of 2. - if pt > c.out.sz && c.out.sz < maxBufSize { + if pt > int64(c.out.sz) && c.out.sz < maxBufSize { c.out.sz <<= 1 } @@ -1070,7 +1070,7 @@ func (c *client) flushOutbound() bool { // Check if we have a stalled gate and if so and we are recovering release // any stalled producers. Only kind==CLIENT will stall. - if c.out.stc != nil && (c.out.lwb == attempted || c.out.pb < c.out.mp/2) { + if c.out.stc != nil && (int64(c.out.lwb) == attempted || c.out.pb < c.out.mp/2) { close(c.out.stc) c.out.stc = nil } @@ -1406,7 +1406,7 @@ func (c *client) queueOutbound(data []byte) bool { // Assume data will not be referenced referenced := false // Add to pending bytes total. - c.out.pb += int32(len(data)) + c.out.pb += int64(len(data)) // Check for slow consumer via pending bytes limit. // ok to return here, client is going away. diff --git a/server/client_test.go b/server/client_test.go index a723b7f3..0e30ffaa 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/json" "fmt" + "math" "net" "reflect" "regexp" @@ -1348,3 +1349,17 @@ func TestTraceMsg(t *testing.T) { } } } + +func TestClientMaxPending(t *testing.T) { + opts := DefaultOptions() + opts.MaxPending = math.MaxInt32 + 1 + s := RunServer(opts) + defer s.Shutdown() + + nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + defer nc.Close() + + sub := natsSubSync(t, nc, "foo") + natsPub(t, nc, "foo", []byte("msg")) + natsNexMsg(t, sub, 100*time.Millisecond) +}