mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Move ints to proper sizes for all
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Copyright 2018 The NATS Authors
|
||||
// Copyright 2018-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -34,7 +34,7 @@ const globalAccountName = "$G"
|
||||
// Route Map Entry - used for efficient interest graph propagation.
|
||||
// TODO(dlc) - squeeze even more?
|
||||
type rme struct {
|
||||
qi int // used to index into key from map for optional queue name
|
||||
qi int32 // used to index into key from map for optional queue name
|
||||
n int32 // number of subscriptions directly matching, local subs only.
|
||||
}
|
||||
|
||||
@@ -50,15 +50,15 @@ type Account struct {
|
||||
sl *Sublist
|
||||
etmr *time.Timer
|
||||
ctmr *time.Timer
|
||||
strack map[string]int
|
||||
nrclients int
|
||||
sysclients int
|
||||
strack map[string]int32
|
||||
nrclients int32
|
||||
sysclients int32
|
||||
clients map[*client]*client
|
||||
rm map[string]*rme
|
||||
imports importMap
|
||||
exports exportMap
|
||||
limits
|
||||
nae int
|
||||
nae int32
|
||||
pruning bool
|
||||
expired bool
|
||||
srv *Server // server this account is registered with (possibly nil)
|
||||
@@ -67,9 +67,9 @@ type Account struct {
|
||||
// Account based limits.
|
||||
type limits struct {
|
||||
mpay int32
|
||||
msubs int
|
||||
mconns int
|
||||
maxnae int
|
||||
msubs int32
|
||||
mconns int32
|
||||
maxnae int32
|
||||
maxaettl time.Duration
|
||||
}
|
||||
|
||||
@@ -137,7 +137,7 @@ func (a *Account) shallowCopy() *Account {
|
||||
func (a *Account) NumConnections() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return len(a.clients) + a.nrclients
|
||||
return len(a.clients) + int(a.nrclients)
|
||||
}
|
||||
|
||||
// NumLocalClients returns active number of clients for this account
|
||||
@@ -150,7 +150,7 @@ func (a *Account) NumLocalConnections() int {
|
||||
|
||||
// Do not account for the system accounts.
|
||||
func (a *Account) numLocalConnections() int {
|
||||
return len(a.clients) - a.sysclients
|
||||
return len(a.clients) - int(a.sysclients)
|
||||
}
|
||||
|
||||
// MaxClientsReached returns if we have reached our limit for number of connections.
|
||||
@@ -162,7 +162,7 @@ func (a *Account) MaxTotalConnectionsReached() bool {
|
||||
|
||||
func (a *Account) maxTotalConnectionsReached() bool {
|
||||
if a.mconns != jwt.NoLimit {
|
||||
return len(a.clients)-a.sysclients+a.nrclients >= a.mconns
|
||||
return len(a.clients)-int(a.sysclients)+int(a.nrclients) >= int(a.mconns)
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -172,7 +172,7 @@ func (a *Account) maxTotalConnectionsReached() bool {
|
||||
func (a *Account) MaxActiveConnections() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return a.mconns
|
||||
return int(a.mconns)
|
||||
}
|
||||
|
||||
// RoutedSubs returns how many subjects we would send across a route when first
|
||||
@@ -315,7 +315,7 @@ func (a *Account) removeServiceImport(subject string) {
|
||||
func (a *Account) numAutoExpireResponseMaps() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return a.nae
|
||||
return int(a.nae)
|
||||
}
|
||||
|
||||
// MaxAutoExpireResponseMaps return the maximum number of
|
||||
@@ -323,14 +323,14 @@ func (a *Account) numAutoExpireResponseMaps() int {
|
||||
func (a *Account) MaxAutoExpireResponseMaps() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return a.maxnae
|
||||
return int(a.maxnae)
|
||||
}
|
||||
|
||||
// SetMaxAutoExpireResponseMaps sets the max outstanding auto expire response maps.
|
||||
func (a *Account) SetMaxAutoExpireResponseMaps(max int) {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
a.maxnae = max
|
||||
a.maxnae = int32(max)
|
||||
}
|
||||
|
||||
// AutoExpireTTL returns the ttl for response maps.
|
||||
@@ -415,7 +415,7 @@ func (a *Account) pruneAutoExpireResponseMaps() {
|
||||
}
|
||||
|
||||
a.mu.RLock()
|
||||
numOver := a.nae - a.maxnae
|
||||
numOver := int(a.nae - a.maxnae)
|
||||
a.mu.RUnlock()
|
||||
|
||||
if numOver <= 0 {
|
||||
@@ -931,9 +931,9 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
}
|
||||
|
||||
// Now do limits if they are present.
|
||||
a.msubs = int(ac.Limits.Subs)
|
||||
a.msubs = int32(ac.Limits.Subs)
|
||||
a.mpay = int32(ac.Limits.Payload)
|
||||
a.mconns = int(ac.Limits.Conn)
|
||||
a.mconns = int32(ac.Limits.Conn)
|
||||
|
||||
clients := gatherClients()
|
||||
// Sort if we are over the limit.
|
||||
@@ -943,7 +943,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
})
|
||||
}
|
||||
for i, c := range clients {
|
||||
if a.mconns != jwt.NoLimit && i >= a.mconns {
|
||||
if a.mconns != jwt.NoLimit && i >= int(a.mconns) {
|
||||
c.maxAccountConnExceeded()
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2018 The NATS Authors
|
||||
// Copyright 2012-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -145,8 +145,8 @@ type client struct {
|
||||
// Here first because of use of atomics, and memory alignment.
|
||||
stats
|
||||
mpay int32
|
||||
msubs int
|
||||
mcl int
|
||||
msubs int32
|
||||
mcl int32
|
||||
mu sync.Mutex
|
||||
kind int
|
||||
cid uint64
|
||||
@@ -160,7 +160,7 @@ type client struct {
|
||||
acc *Account
|
||||
user *NkeyUser
|
||||
host string
|
||||
port int
|
||||
port uint16
|
||||
subs map[string]*subscription
|
||||
perms *permissions
|
||||
mperms *msgDeny
|
||||
@@ -198,14 +198,14 @@ type outbound struct {
|
||||
p []byte // Primary write buffer
|
||||
s []byte // Secondary for use post flush
|
||||
nb net.Buffers // net.Buffers for writev IO
|
||||
sz int // limit size per []byte, uses variable BufSize constants, start, min, max.
|
||||
sws int // Number of short writes, used for dynamic resizing.
|
||||
pb int // Total pending/queued bytes.
|
||||
pm int // Total pending/queued messages.
|
||||
sz int32 // limit size per []byte, uses variable BufSize constants, start, min, max.
|
||||
sws int32 // Number of short writes, used for dynamic resizing.
|
||||
pb int32 // Total pending/queued bytes.
|
||||
pm int32 // Total pending/queued messages.
|
||||
sg *sync.Cond // Flusher conditional for signaling.
|
||||
wdl time.Duration // Snapshot fo write deadline.
|
||||
mp int // snapshot of max pending.
|
||||
fsp int // Flush signals that are pending from readLoop's pcd.
|
||||
mp int32 // snapshot of max pending.
|
||||
fsp int32 // Flush signals that are pending from readLoop's pcd.
|
||||
lft time.Duration // Last flush time.
|
||||
sgw bool // Indicate flusher is waiting on condition wait.
|
||||
}
|
||||
@@ -259,11 +259,14 @@ type readCache struct {
|
||||
rts []routeTarget
|
||||
|
||||
prand *rand.Rand
|
||||
msgs int
|
||||
bytes int
|
||||
subs int
|
||||
rsz int // Read buffer size
|
||||
srs int // Short reads, used for dynamic buffer resizing.
|
||||
|
||||
// These are all temporary totals for an invocation of a read in readloop.
|
||||
msgs int32
|
||||
bytes int32
|
||||
subs int32
|
||||
|
||||
rsz int32 // Read buffer size
|
||||
srs int32 // Short reads, used for dynamic buffer resizing.
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -354,7 +357,7 @@ func (c *client) initClient() {
|
||||
opts := s.getOpts()
|
||||
// Snapshots to avoid mutex access in fast paths.
|
||||
c.out.wdl = opts.WriteDeadline
|
||||
c.out.mp = int(opts.MaxPending)
|
||||
c.out.mp = int32(opts.MaxPending)
|
||||
|
||||
c.subs = make(map[string]*subscription)
|
||||
c.echo = true
|
||||
@@ -377,7 +380,7 @@ func (c *client) initClient() {
|
||||
if ip, ok := c.nc.(*net.TCPConn); ok {
|
||||
addr := ip.RemoteAddr().(*net.TCPAddr)
|
||||
c.host = addr.IP.String()
|
||||
c.port = addr.Port
|
||||
c.port = uint16(addr.Port)
|
||||
conn = fmt.Sprintf("%s:%d", addr.IP, addr.Port)
|
||||
}
|
||||
|
||||
@@ -448,12 +451,12 @@ func (c *client) registerWithAccount(acc *Account) error {
|
||||
|
||||
// Helper to determine if we have exceeded max subs.
|
||||
func (c *client) subsExceeded() bool {
|
||||
return c.msubs != jwt.NoLimit && len(c.subs) > c.msubs
|
||||
return c.msubs != jwt.NoLimit && len(c.subs) > int(c.msubs)
|
||||
}
|
||||
|
||||
// Helper to determine if we have met or exceeded max subs.
|
||||
func (c *client) subsAtLimit() bool {
|
||||
return c.msubs != jwt.NoLimit && len(c.subs) >= c.msubs
|
||||
return c.msubs != jwt.NoLimit && len(c.subs) >= int(c.msubs)
|
||||
}
|
||||
|
||||
// Apply account limits
|
||||
@@ -481,9 +484,9 @@ func (c *client) applyAccountLimits() {
|
||||
}
|
||||
|
||||
// We check here if the server has an option set that is lower than the account limit.
|
||||
if c.msubs != jwt.NoLimit && opts.MaxSubs != 0 && opts.MaxSubs < c.acc.msubs {
|
||||
if c.msubs != jwt.NoLimit && opts.MaxSubs != 0 && opts.MaxSubs < int(c.acc.msubs) {
|
||||
c.Errorf("Max Subscriptions set to %d from server config which overrides %d from account claims", opts.MaxSubs, c.acc.msubs)
|
||||
c.msubs = opts.MaxSubs
|
||||
c.msubs = int32(opts.MaxSubs)
|
||||
}
|
||||
|
||||
if c.subsExceeded() {
|
||||
@@ -663,7 +666,7 @@ func (c *client) readLoop() {
|
||||
c.mcl = MAX_CONTROL_LINE_SIZE
|
||||
if s != nil {
|
||||
if opts := s.getOpts(); opts != nil {
|
||||
c.mcl = opts.MaxControlLine
|
||||
c.mcl = int32(opts.MaxControlLine)
|
||||
}
|
||||
}
|
||||
defer s.grWG.Done()
|
||||
@@ -758,11 +761,11 @@ func (c *client) readLoop() {
|
||||
// Update read buffer size as/if needed.
|
||||
if n >= cap(b) && cap(b) < maxBufSize {
|
||||
// Grow
|
||||
c.in.rsz = cap(b) * 2
|
||||
c.in.rsz = int32(cap(b) * 2)
|
||||
b = make([]byte, c.in.rsz)
|
||||
} else if n < cap(b) && cap(b) > minBufSize && c.in.srs > shortsToShrink {
|
||||
// Shrink, for now don't accelerate, ping/pong will eventually sort it out.
|
||||
c.in.rsz = cap(b) / 2
|
||||
c.in.rsz = int32(cap(b) / 2)
|
||||
b = make([]byte, c.in.rsz)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
@@ -855,7 +858,7 @@ func (c *client) flushOutbound() bool {
|
||||
c.out.lft = lft
|
||||
|
||||
// Subtract from pending bytes and messages.
|
||||
c.out.pb -= int(n)
|
||||
c.out.pb -= int32(n)
|
||||
c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate.
|
||||
|
||||
// Check for partial writes
|
||||
@@ -902,7 +905,7 @@ func (c *client) flushOutbound() bool {
|
||||
}
|
||||
|
||||
// Adjust based on what we wrote plus any pending.
|
||||
pt := int(n) + c.out.pb
|
||||
pt := int32(n) + c.out.pb
|
||||
|
||||
// Adjust sz as needed downward, keeping power of 2.
|
||||
// We do this at a slower rate.
|
||||
@@ -920,11 +923,11 @@ func (c *client) flushOutbound() bool {
|
||||
// Check to see if we can reuse buffers.
|
||||
if len(cnb) > 0 {
|
||||
oldp := cnb[0][:0]
|
||||
if cap(oldp) >= c.out.sz {
|
||||
if cap(oldp) >= int(c.out.sz) {
|
||||
// Replace primary or secondary if they are nil, reusing same buffer.
|
||||
if c.out.p == nil {
|
||||
c.out.p = oldp
|
||||
} else if c.out.s == nil || cap(c.out.s) < c.out.sz {
|
||||
} else if c.out.s == nil || cap(c.out.s) < int(c.out.sz) {
|
||||
c.out.s = oldp
|
||||
}
|
||||
}
|
||||
@@ -1244,7 +1247,7 @@ func (c *client) queueOutbound(data []byte) bool {
|
||||
// Assume data will not be referenced
|
||||
referenced := false
|
||||
// Add to pending bytes total.
|
||||
c.out.pb += len(data)
|
||||
c.out.pb += int32(len(data))
|
||||
|
||||
// Check for slow consumer via pending bytes limit.
|
||||
// ok to return here, client is going away.
|
||||
@@ -1259,7 +1262,7 @@ func (c *client) queueOutbound(data []byte) bool {
|
||||
if c.out.sz == 0 {
|
||||
c.out.sz = startBufSize
|
||||
}
|
||||
if c.out.s != nil && cap(c.out.s) >= c.out.sz {
|
||||
if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) {
|
||||
c.out.p = c.out.s
|
||||
c.out.s = nil
|
||||
} else {
|
||||
@@ -1272,7 +1275,7 @@ func (c *client) queueOutbound(data []byte) bool {
|
||||
if len(data) > available {
|
||||
// We can fit into existing primary, but message will fit in next one
|
||||
// we allocate or utilize from the secondary. So copy what we can.
|
||||
if available > 0 && len(data) < c.out.sz {
|
||||
if available > 0 && len(data) < int(c.out.sz) {
|
||||
c.out.p = append(c.out.p, data[:available]...)
|
||||
data = data[available:]
|
||||
}
|
||||
@@ -1293,10 +1296,10 @@ func (c *client) queueOutbound(data []byte) bool {
|
||||
if (c.out.sz << 1) <= maxBufSize {
|
||||
c.out.sz <<= 1
|
||||
}
|
||||
if len(data) > c.out.sz {
|
||||
if len(data) > int(c.out.sz) {
|
||||
c.out.p = make([]byte, 0, len(data))
|
||||
} else {
|
||||
if c.out.s != nil && cap(c.out.s) >= c.out.sz { // TODO(dlc) - Size mismatch?
|
||||
if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) { // TODO(dlc) - Size mismatch?
|
||||
c.out.p = c.out.s
|
||||
c.out.s = nil
|
||||
} else {
|
||||
@@ -2125,7 +2128,7 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
// Update statistics
|
||||
// The msg includes the CR_LF, so pull back out for accounting.
|
||||
c.in.msgs++
|
||||
c.in.bytes += len(msg) - LEN_CR_LF
|
||||
c.in.bytes += int32(len(msg) - LEN_CR_LF)
|
||||
|
||||
if c.trace {
|
||||
c.traceMsg(msg)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2018 The NATS Authors
|
||||
// Copyright 2012-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -966,8 +966,8 @@ func TestDynamicBuffers(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create some helper functions and data structures.
|
||||
done := make(chan bool) // Used to stop recording.
|
||||
type maxv struct{ rsz, wsz int } // Used to hold max values.
|
||||
done := make(chan bool) // Used to stop recording.
|
||||
type maxv struct{ rsz, wsz int32 } // Used to hold max values.
|
||||
results := make(chan maxv)
|
||||
|
||||
// stopRecording stops the recording ticker and releases go routine.
|
||||
@@ -976,14 +976,14 @@ func TestDynamicBuffers(t *testing.T) {
|
||||
return <-results
|
||||
}
|
||||
// max just grabs max values.
|
||||
max := func(a, b int) int {
|
||||
max := func(a, b int32) int32 {
|
||||
if a > b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
// Returns current value of the buffer sizes.
|
||||
getBufferSizes := func() (int, int) {
|
||||
getBufferSizes := func() (int32, int32) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.in.rsz, c.out.sz
|
||||
@@ -1013,7 +1013,7 @@ func TestDynamicBuffers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
// Check that the current value is what we expected.
|
||||
checkBuffers := func(ers, ews int) {
|
||||
checkBuffers := func(ers, ews int32) {
|
||||
t.Helper()
|
||||
rsz, wsz := getBufferSizes()
|
||||
if rsz != ers {
|
||||
@@ -1025,7 +1025,7 @@ func TestDynamicBuffers(t *testing.T) {
|
||||
}
|
||||
|
||||
// Check that the max was as expected.
|
||||
checkResults := func(m maxv, rsz, wsz int) {
|
||||
checkResults := func(m maxv, rsz, wsz int32) {
|
||||
t.Helper()
|
||||
if rsz != m.rsz {
|
||||
t.Fatalf("Expected read buffer of %d, but got %d\n", rsz, m.rsz)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2018 The NATS Authors
|
||||
// Copyright 2018-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -632,12 +632,12 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg
|
||||
// If we are here we have interest in tracking this account. Update our accounting.
|
||||
acc.mu.Lock()
|
||||
if acc.strack == nil {
|
||||
acc.strack = make(map[string]int)
|
||||
acc.strack = make(map[string]int32)
|
||||
}
|
||||
// This does not depend on receiving all updates since each one is idempotent.
|
||||
prev := acc.strack[m.Server.ID]
|
||||
acc.strack[m.Server.ID] = m.Conns
|
||||
acc.nrclients += (m.Conns - prev)
|
||||
acc.strack[m.Server.ID] = int32(m.Conns)
|
||||
acc.nrclients += int32(m.Conns) - prev
|
||||
acc.mu.Unlock()
|
||||
|
||||
s.updateRemoteServer(&m.Server)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2018 The NATS Authors
|
||||
// Copyright 2018-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -2070,7 +2070,7 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
|
||||
// Update statistics
|
||||
c.in.msgs++
|
||||
// The msg includes the CR_LF, so pull back out for accounting.
|
||||
c.in.bytes += len(msg) - LEN_CR_LF
|
||||
c.in.bytes += int32(len(msg) - LEN_CR_LF)
|
||||
|
||||
if c.trace {
|
||||
c.traceMsg(msg)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2013-2018 The NATS Authors
|
||||
// Copyright 2013-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -384,7 +384,7 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time) {
|
||||
}
|
||||
|
||||
if client.port != 0 {
|
||||
ci.Port = client.port
|
||||
ci.Port = int(client.port)
|
||||
ci.IP = client.host
|
||||
}
|
||||
}
|
||||
@@ -920,7 +920,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
v := &Varz{Info: &s.info, Options: opts, MaxPayload: opts.MaxPayload, Start: s.start}
|
||||
v := &Varz{Info: &s.info, Options: opts, MaxPayload: int(opts.MaxPayload), Start: s.start}
|
||||
v.Now = time.Now()
|
||||
v.Uptime = myUptime(time.Since(s.start))
|
||||
v.Port = v.Info.Port
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2018 The NATS Authors
|
||||
// Copyright 2012-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -105,8 +105,8 @@ type Options struct {
|
||||
HTTPPort int `json:"http_port"`
|
||||
HTTPSPort int `json:"https_port"`
|
||||
AuthTimeout float64 `json:"auth_timeout"`
|
||||
MaxControlLine int `json:"max_control_line"`
|
||||
MaxPayload int `json:"max_payload"`
|
||||
MaxControlLine int32 `json:"max_control_line"`
|
||||
MaxPayload int32 `json:"max_payload"`
|
||||
MaxPending int64 `json:"max_pending"`
|
||||
Cluster ClusterOpts `json:"cluster,omitempty"`
|
||||
Gateway GatewayOpts `json:"gateway,omitempty"`
|
||||
@@ -442,9 +442,19 @@ func (o *Options) ProcessConfigFile(configFile string) error {
|
||||
case "prof_port":
|
||||
o.ProfPort = int(v.(int64))
|
||||
case "max_control_line":
|
||||
o.MaxControlLine = int(v.(int64))
|
||||
if v.(int64) > 1<<31-1 {
|
||||
err := &configErr{tk, fmt.Sprintf("%s value is too big", k)}
|
||||
errors = append(errors, err)
|
||||
continue
|
||||
}
|
||||
o.MaxControlLine = int32(v.(int64))
|
||||
case "max_payload":
|
||||
o.MaxPayload = int(v.(int64))
|
||||
if v.(int64) > 1<<31-1 {
|
||||
err := &configErr{tk, fmt.Sprintf("%s value is too big", k)}
|
||||
errors = append(errors, err)
|
||||
continue
|
||||
}
|
||||
o.MaxPayload = int32(v.(int64))
|
||||
case "max_pending":
|
||||
o.MaxPending = v.(int64)
|
||||
case "max_connections", "max_conn":
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2018 The NATS Authors
|
||||
// Copyright 2012-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -1708,3 +1708,31 @@ func TestParsingGatewaysErrors(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLargeMaxControlLine(t *testing.T) {
|
||||
confFileName := "big_mcl.conf"
|
||||
defer os.Remove(confFileName)
|
||||
content := `
|
||||
max_control_line = 3000000000
|
||||
`
|
||||
if err := ioutil.WriteFile(confFileName, []byte(content), 0666); err != nil {
|
||||
t.Fatalf("Error writing config file: %v", err)
|
||||
}
|
||||
if _, err := ProcessConfigFile(confFileName); err == nil {
|
||||
t.Fatalf("Expected an error from too large of a max_control_line entry")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLargeMaxPayload(t *testing.T) {
|
||||
confFileName := "big_mp.conf"
|
||||
defer os.Remove(confFileName)
|
||||
content := `
|
||||
max_payload = 3000000000
|
||||
`
|
||||
if err := ioutil.WriteFile(confFileName, []byte(content), 0666); err != nil {
|
||||
t.Fatalf("Error writing config file: %v", err)
|
||||
}
|
||||
if _, err := ProcessConfigFile(confFileName); err == nil {
|
||||
t.Fatalf("Expected an error from too large of a max_payload entry")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -808,7 +808,7 @@ func (c *client) parse(buf []byte) error {
|
||||
// Check for violations of control line length here. Note that this is not
|
||||
// exact at all but the performance hit is too great to be precise, and
|
||||
// catching here should prevent memory exhaustion attacks.
|
||||
if len(c.argBuf) > mcl {
|
||||
if len(c.argBuf) > int(mcl) {
|
||||
c.sendErr("Maximum Control Line Exceeded")
|
||||
c.closeConnection(MaxControlLineExceeded)
|
||||
return ErrMaxControlLine
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2017-2018 The NATS Authors
|
||||
// Copyright 2017-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -399,7 +399,7 @@ func (p *portsFileDirOption) Apply(server *Server) {
|
||||
// `max_control_line` setting.
|
||||
type maxControlLineOption struct {
|
||||
noopOption
|
||||
newValue int
|
||||
newValue int32
|
||||
}
|
||||
|
||||
// Apply is a no-op because the max control line will be reloaded after options
|
||||
@@ -412,7 +412,7 @@ func (m *maxControlLineOption) Apply(server *Server) {
|
||||
// setting.
|
||||
type maxPayloadOption struct {
|
||||
noopOption
|
||||
newValue int
|
||||
newValue int32
|
||||
}
|
||||
|
||||
// Apply the setting by updating the server info and each client.
|
||||
@@ -661,9 +661,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
|
||||
case "portsfiledir":
|
||||
diffOpts = append(diffOpts, &portsFileDirOption{newValue: newValue.(string), oldValue: oldValue.(string)})
|
||||
case "maxcontrolline":
|
||||
diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int)})
|
||||
diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int32)})
|
||||
case "maxpayload":
|
||||
diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int)})
|
||||
diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int32)})
|
||||
case "pinginterval":
|
||||
diffOpts = append(diffOpts, &pingIntervalOption{newValue: newValue.(time.Duration)})
|
||||
case "maxpingsout":
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2013-2018 The NATS Authors
|
||||
// Copyright 2013-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -238,7 +238,7 @@ func (c *client) processInboundRoutedMsg(msg []byte) {
|
||||
// Update statistics
|
||||
c.in.msgs++
|
||||
// The msg includes the CR_LF, so pull back out for accounting.
|
||||
c.in.bytes += len(msg) - LEN_CR_LF
|
||||
c.in.bytes += int32(len(msg) - LEN_CR_LF)
|
||||
|
||||
if c.trace {
|
||||
c.traceMsg(msg)
|
||||
@@ -1268,14 +1268,14 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
|
||||
var (
|
||||
_rkey [1024]byte
|
||||
key []byte
|
||||
qi int
|
||||
qi int32
|
||||
)
|
||||
if sub.queue != nil {
|
||||
// Just make the key subject spc group, e.g. 'foo bar'
|
||||
key = _rkey[:0]
|
||||
key = append(key, sub.subject...)
|
||||
key = append(key, byte(' '))
|
||||
qi = len(key)
|
||||
qi = int32(len(key))
|
||||
key = append(key, sub.queue...)
|
||||
} else {
|
||||
key = sub.subject
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2018 The NATS Authors
|
||||
// Copyright 2012-2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -60,7 +60,7 @@ type Info struct {
|
||||
AuthRequired bool `json:"auth_required,omitempty"`
|
||||
TLSRequired bool `json:"tls_required,omitempty"`
|
||||
TLSVerify bool `json:"tls_verify,omitempty"`
|
||||
MaxPayload int `json:"max_payload"`
|
||||
MaxPayload int32 `json:"max_payload"`
|
||||
IP string `json:"ip,omitempty"`
|
||||
CID uint64 `json:"client_id,omitempty"`
|
||||
Nonce string `json:"nonce,omitempty"`
|
||||
@@ -1375,7 +1375,7 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
opts := s.getOpts()
|
||||
|
||||
maxPay := int32(opts.MaxPayload)
|
||||
maxSubs := opts.MaxSubs
|
||||
maxSubs := int32(opts.MaxSubs)
|
||||
// For system, maxSubs of 0 means unlimited, so re-adjust here.
|
||||
if maxSubs == 0 {
|
||||
maxSubs = -1
|
||||
|
||||
Reference in New Issue
Block a user