Merge pull request #956 from nats-io/mem

Reduce startup memory for cluster
This commit is contained in:
Derek Collison
2019-04-17 11:21:50 -07:00
committed by GitHub
3 changed files with 62 additions and 11 deletions

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -40,7 +40,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.0.0-RC5"
VERSION = "2.0.0-RC6"
// PROTO is the currently supported protocol.
// 0 was the original

View File

@@ -885,13 +885,12 @@ func (c *client) processRemoteSub(argo []byte) (err error) {
// complete interest for all subjects, both normal as a binary
// and queue group weights.
func (s *Server) sendSubsToRoute(route *client) {
// Send over our account subscriptions.
var _accs [4096]*Account
accs := _accs[:0]
// copy accounts into array first
s.mu.Lock()
// Estimated size of all protocols. It does not have to be accurate at all.
eSize := 0
// Send over our account subscriptions.
// copy accounts into array first
accs := make([]*Account, 0, len(s.accounts))
for _, a := range s.accounts {
accs = append(accs, a)
a.mu.RLock()
@@ -905,7 +904,7 @@ func (s *Server) sendSubsToRoute(route *client) {
s.mu.Unlock()
sendSubs := func(accs []*Account) {
var raw [4096]*subscription
var raw [32]*subscription
var closed bool
route.mu.Lock()
@@ -983,11 +982,10 @@ func (c *client) sendRouteUnSubProtos(subs []*subscription, trace bool, filter f
// Use sendRouteSubProtos or sendRouteUnSubProtos instead for clarity.
// Lock is held on entry.
func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, trace bool, filter func(sub *subscription) bool) bool {
const staticBufSize = maxBufSize * 2
var (
_buf [staticBufSize]byte // array on stack
_buf [1024]byte // array on stack
buf = _buf[:0] // our buffer will initially point to the stack buffer
mbs = staticBufSize // max size of the buffer
mbs = maxBufSize * 2 // max size of the buffer
mpMax = int(c.out.mp / 2) // 50% of max_pending
closed bool
)
@@ -1163,7 +1161,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
// Do final client initialization
// Initialize the per-account cache.
c.in.pacache = make(map[string]*perAccountCache, maxPerAccountCacheSize)
c.in.pacache = make(map[string]*perAccountCache)
if didSolicit {
// Set permissions associated with the route user (if applicable).
// No lock needed since we are already under client lock.

View File

@@ -17,6 +17,8 @@ import (
"encoding/json"
"fmt"
"net"
"net/url"
"runtime"
"testing"
"time"
@@ -1598,3 +1600,54 @@ func TestNewRouteLargeDistinctQueueSubscribers(t *testing.T) {
return nil
})
}
func TestLargeClusterMem(t *testing.T) {
// Try to clean up.
runtime.GC()
var m runtime.MemStats
runtime.ReadMemStats(&m)
pta := m.TotalAlloc
opts := func() *server.Options {
o := DefaultTestOptions
o.Host = "127.0.0.1"
o.Port = -1
o.Cluster.Host = o.Host
o.Cluster.Port = -1
return &o
}
var servers []*server.Server
// Create seed first.
o := opts()
s := RunServer(o)
servers = append(servers, s)
// For connecting to seed server above.
routeAddr := fmt.Sprintf("nats-route://%s:%d", o.Cluster.Host, o.Cluster.Port)
rurl, _ := url.Parse(routeAddr)
routes := []*url.URL{rurl}
numServers := 25
for i := 1; i < numServers; i++ {
o := opts()
o.Routes = routes
s := RunServer(o)
servers = append(servers, s)
}
checkClusterFormed(t, servers...)
// Calculate in MB what we are using now.
const max = 50 * 1024 * 1024 // 50MB
runtime.ReadMemStats(&m)
used := (m.TotalAlloc - pta) / (1024 * 1024)
if used > max {
t.Fatalf("Cluster using too much memory, expect < 50MB, got %dMB", used)
}
for _, s := range servers {
s.Shutdown()
}
}