mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
[ADDED] lame_duck_grace_period option
The grace period used to be hardcoded at 10 seconds. This option allows the user to configure the amount of time the server will wait before initiating the closing of client connections. Note that the grace period needs to be strictly lower than the overall lame_duck_duration. The server deducts the grace period from that overall duration and spreads the closing of connections during that time. For instance, if there are 1000 connections and the lame duck duration is set to 30 seconds and grace period to 10, then the server will use 30-10 = 20 seconds to spread the closing of those 1000 connections, so say roughly 50 clients per second. Resolves #1459. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1090,11 +1090,38 @@ func TestConfigCheck(t *testing.T) {
|
||||
{
|
||||
name: "invalid lame_duck_duration type",
|
||||
config: `
|
||||
lame_duck_duration: abc
|
||||
lame_duck_duration: abc
|
||||
`,
|
||||
err: errors.New(`error parsing lame_duck_duration: time: invalid duration abc`),
|
||||
errorLine: 2,
|
||||
errorPos: 3,
|
||||
errorPos: 5,
|
||||
},
|
||||
{
|
||||
name: "lame_duck_duration too small",
|
||||
config: `
|
||||
lame_duck_duration: "5s"
|
||||
`,
|
||||
err: errors.New(`invalid lame_duck_duration of 5s, minimum is 30 seconds`),
|
||||
errorLine: 2,
|
||||
errorPos: 5,
|
||||
},
|
||||
{
|
||||
name: "invalid lame_duck_grace_period type",
|
||||
config: `
|
||||
lame_duck_grace_period: abc
|
||||
`,
|
||||
err: errors.New(`error parsing lame_duck_grace_period: time: invalid duration abc`),
|
||||
errorLine: 2,
|
||||
errorPos: 5,
|
||||
},
|
||||
{
|
||||
name: "lame_duck_grace_period should be positive",
|
||||
config: `
|
||||
lame_duck_grace_period: "-5s"
|
||||
`,
|
||||
err: errors.New(`invalid lame_duck_grace_period, needs to be positive`),
|
||||
errorLine: 2,
|
||||
errorPos: 5,
|
||||
},
|
||||
{
|
||||
name: "when only setting TLS timeout for a leafnode remote",
|
||||
|
||||
@@ -146,6 +146,10 @@ const (
|
||||
// the closing of clients when signaled to go in lame duck mode.
|
||||
DEFAULT_LAME_DUCK_DURATION = 2 * time.Minute
|
||||
|
||||
// DEFAULT_LAME_DUCK_GRACE_PERIOD is the duration the server waits, after entering
|
||||
// lame duck mode, before starting closing client connections.
|
||||
DEFAULT_LAME_DUCK_GRACE_PERIOD = 10 * time.Second
|
||||
|
||||
// DEFAULT_LEAFNODE_INFO_WAIT Route dial timeout.
|
||||
DEFAULT_LEAFNODE_INFO_WAIT = 1 * time.Second
|
||||
|
||||
|
||||
@@ -211,6 +211,7 @@ type Options struct {
|
||||
WriteDeadline time.Duration `json:"-"`
|
||||
MaxClosedClients int `json:"-"`
|
||||
LameDuckDuration time.Duration `json:"-"`
|
||||
LameDuckGracePeriod time.Duration `json:"-"`
|
||||
|
||||
// MaxTracedMsgLen is the maximum printable length for traced messages.
|
||||
MaxTracedMsgLen int `json:"-"`
|
||||
@@ -729,6 +730,19 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
|
||||
return
|
||||
}
|
||||
o.LameDuckDuration = dur
|
||||
case "lame_duck_grace_period":
|
||||
dur, err := time.ParseDuration(v.(string))
|
||||
if err != nil {
|
||||
err := &configErr{tk, fmt.Sprintf("error parsing lame_duck_grace_period: %v", err)}
|
||||
*errors = append(*errors, err)
|
||||
return
|
||||
}
|
||||
if dur < 0 {
|
||||
err := &configErr{tk, "invalid lame_duck_grace_period, needs to be positive"}
|
||||
*errors = append(*errors, err)
|
||||
return
|
||||
}
|
||||
o.LameDuckGracePeriod = dur
|
||||
case "operator", "operators", "roots", "root", "root_operators", "root_operator":
|
||||
opFiles := []string{}
|
||||
switch v := v.(type) {
|
||||
@@ -3408,6 +3422,9 @@ func setBaselineOptions(opts *Options) {
|
||||
if opts.LameDuckDuration == 0 {
|
||||
opts.LameDuckDuration = DEFAULT_LAME_DUCK_DURATION
|
||||
}
|
||||
if opts.LameDuckGracePeriod == 0 {
|
||||
opts.LameDuckGracePeriod = DEFAULT_LAME_DUCK_GRACE_PERIOD
|
||||
}
|
||||
if opts.Gateway.Port != 0 {
|
||||
if opts.Gateway.Host == "" {
|
||||
opts.Gateway.Host = DEFAULT_HOST
|
||||
|
||||
@@ -45,20 +45,21 @@ func checkOptionsEqual(t *testing.T, golden, opts *Options) {
|
||||
|
||||
func TestDefaultOptions(t *testing.T) {
|
||||
golden := &Options{
|
||||
Host: DEFAULT_HOST,
|
||||
Port: DEFAULT_PORT,
|
||||
MaxConn: DEFAULT_MAX_CONNECTIONS,
|
||||
HTTPHost: DEFAULT_HOST,
|
||||
PingInterval: DEFAULT_PING_INTERVAL,
|
||||
MaxPingsOut: DEFAULT_PING_MAX_OUT,
|
||||
TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second),
|
||||
AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second),
|
||||
MaxControlLine: MAX_CONTROL_LINE_SIZE,
|
||||
MaxPayload: MAX_PAYLOAD_SIZE,
|
||||
MaxPending: MAX_PENDING_SIZE,
|
||||
WriteDeadline: DEFAULT_FLUSH_DEADLINE,
|
||||
MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS,
|
||||
LameDuckDuration: DEFAULT_LAME_DUCK_DURATION,
|
||||
Host: DEFAULT_HOST,
|
||||
Port: DEFAULT_PORT,
|
||||
MaxConn: DEFAULT_MAX_CONNECTIONS,
|
||||
HTTPHost: DEFAULT_HOST,
|
||||
PingInterval: DEFAULT_PING_INTERVAL,
|
||||
MaxPingsOut: DEFAULT_PING_MAX_OUT,
|
||||
TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second),
|
||||
AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second),
|
||||
MaxControlLine: MAX_CONTROL_LINE_SIZE,
|
||||
MaxPayload: MAX_PAYLOAD_SIZE,
|
||||
MaxPending: MAX_PENDING_SIZE,
|
||||
WriteDeadline: DEFAULT_FLUSH_DEADLINE,
|
||||
MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS,
|
||||
LameDuckDuration: DEFAULT_LAME_DUCK_DURATION,
|
||||
LameDuckGracePeriod: DEFAULT_LAME_DUCK_GRACE_PERIOD,
|
||||
LeafNode: LeafNodeOpts{
|
||||
ReconnectInterval: DEFAULT_LEAF_NODE_RECONNECT,
|
||||
},
|
||||
|
||||
@@ -45,9 +45,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// Time to wait before starting closing clients when in LD mode.
|
||||
lameDuckModeDefaultInitialDelay = int64(10 * time.Second)
|
||||
|
||||
// Interval for the first PING for non client connections.
|
||||
firstPingInterval = time.Second
|
||||
|
||||
@@ -55,9 +52,6 @@ const (
|
||||
firstClientPingInterval = 2 * time.Second
|
||||
)
|
||||
|
||||
// Make this a variable so that we can change during tests
|
||||
var lameDuckModeInitialDelay = int64(lameDuckModeDefaultInitialDelay)
|
||||
|
||||
// Info is the information sent to clients, routes, gateways, and leaf nodes,
|
||||
// to help them understand information about this server.
|
||||
type Info struct {
|
||||
@@ -427,6 +421,10 @@ func (s *Server) ClientURL() string {
|
||||
}
|
||||
|
||||
func validateOptions(o *Options) error {
|
||||
if o.LameDuckDuration > 0 && o.LameDuckGracePeriod >= o.LameDuckDuration {
|
||||
return fmt.Errorf("lame duck grace period (%v) should be strictly lower than lame duck duration (%v)",
|
||||
o.LameDuckGracePeriod, o.LameDuckDuration)
|
||||
}
|
||||
// Check that the trust configuration is correct.
|
||||
if err := validateTrustedOperators(o); err != nil {
|
||||
return err
|
||||
@@ -2716,6 +2714,14 @@ func (s *Server) lameDuckMode() {
|
||||
s.websocket.listener = nil
|
||||
}
|
||||
s.ldmCh = make(chan bool, expected)
|
||||
opts := s.getOpts()
|
||||
gp := opts.LameDuckGracePeriod
|
||||
// For tests, we want the grace period to be in some cases bigger
|
||||
// than the ldm duration, so to by-pass the validateOptions() check,
|
||||
// we use negative number and flip it here.
|
||||
if gp < 0 {
|
||||
gp *= -1
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
// Wait for accept loops to be done to make sure that no new
|
||||
@@ -2734,8 +2740,8 @@ func (s *Server) lameDuckMode() {
|
||||
s.Shutdown()
|
||||
return
|
||||
}
|
||||
dur := int64(s.getOpts().LameDuckDuration)
|
||||
dur -= atomic.LoadInt64(&lameDuckModeInitialDelay)
|
||||
dur := int64(opts.LameDuckDuration)
|
||||
dur -= int64(gp)
|
||||
if dur <= 0 {
|
||||
dur = int64(time.Second)
|
||||
}
|
||||
@@ -2767,7 +2773,7 @@ func (s *Server) lameDuckMode() {
|
||||
s.sendLDMToClients()
|
||||
s.mu.Unlock()
|
||||
|
||||
t := time.NewTimer(time.Duration(atomic.LoadInt64(&lameDuckModeInitialDelay)))
|
||||
t := time.NewTimer(gp)
|
||||
// Delay start of closing of client connections in case
|
||||
// we have several servers that we want to signal to enter LD mode
|
||||
// and not have their client reconnect to each other.
|
||||
|
||||
@@ -665,11 +665,30 @@ func TestProfilingNoTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLameDuckMode(t *testing.T) {
|
||||
atomic.StoreInt64(&lameDuckModeInitialDelay, 0)
|
||||
defer atomic.StoreInt64(&lameDuckModeInitialDelay, lameDuckModeDefaultInitialDelay)
|
||||
func TestLameDuckOptionsValidation(t *testing.T) {
|
||||
o := DefaultOptions()
|
||||
o.LameDuckDuration = 5 * time.Second
|
||||
o.LameDuckGracePeriod = 10 * time.Second
|
||||
s, err := NewServer(o)
|
||||
if s != nil {
|
||||
s.Shutdown()
|
||||
}
|
||||
if err == nil || !strings.Contains(err.Error(), "should be strictly lower") {
|
||||
t.Fatalf("Expected error saying that ldm grace period should be lower than ldm duration, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testSetLDMGracePeriod(o *Options, val time.Duration) {
|
||||
// For tests, we set the grace period as a negative value
|
||||
// so we can have a grace period bigger than the total duration.
|
||||
// When validating options, we would not be able to run the
|
||||
// server without this trick.
|
||||
o.LameDuckGracePeriod = val * -1
|
||||
}
|
||||
|
||||
func TestLameDuckMode(t *testing.T) {
|
||||
optsA := DefaultOptions()
|
||||
testSetLDMGracePeriod(optsA, time.Nanosecond)
|
||||
optsA.Cluster.Host = "127.0.0.1"
|
||||
srvA := RunServer(optsA)
|
||||
defer srvA.Shutdown()
|
||||
@@ -830,19 +849,21 @@ func TestLameDuckMode(t *testing.T) {
|
||||
// Now test that we introduce delay before starting closing client connections.
|
||||
// This allow to "signal" multiple servers and avoid their clients to reconnect
|
||||
// to a server that is going to be going in LD mode.
|
||||
atomic.StoreInt64(&lameDuckModeInitialDelay, int64(100*time.Millisecond))
|
||||
|
||||
testSetLDMGracePeriod(optsA, 100*time.Millisecond)
|
||||
optsA.LameDuckDuration = 10 * time.Millisecond
|
||||
srvA = RunServer(optsA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
||||
testSetLDMGracePeriod(optsB, 100*time.Millisecond)
|
||||
optsB.LameDuckDuration = 10 * time.Millisecond
|
||||
srvB = RunServer(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
optsC := DefaultOptions()
|
||||
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
||||
testSetLDMGracePeriod(optsC, 100*time.Millisecond)
|
||||
optsC.LameDuckGracePeriod = -100 * time.Millisecond
|
||||
optsC.LameDuckDuration = 10 * time.Millisecond
|
||||
srvC := RunServer(optsC)
|
||||
defer srvC.Shutdown()
|
||||
@@ -878,15 +899,13 @@ func TestLameDuckMode(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLameDuckModeInfo(t *testing.T) {
|
||||
// Ensure that initial delay is set very high so that we can
|
||||
// check that some events occur as expected before the client
|
||||
// is disconnected.
|
||||
atomic.StoreInt64(&lameDuckModeInitialDelay, int64(5*time.Second))
|
||||
defer atomic.StoreInt64(&lameDuckModeInitialDelay, lameDuckModeDefaultInitialDelay)
|
||||
|
||||
optsA := testWSOptions()
|
||||
optsA.Cluster.Host = "127.0.0.1"
|
||||
optsA.Cluster.Port = -1
|
||||
// Ensure that initial delay is set very high so that we can
|
||||
// check that some events occur as expected before the client
|
||||
// is disconnected.
|
||||
testSetLDMGracePeriod(optsA, 5*time.Second)
|
||||
optsA.LameDuckDuration = 50 * time.Millisecond
|
||||
optsA.DisableShortFirstPing = true
|
||||
srvA := RunServer(optsA)
|
||||
@@ -954,6 +973,7 @@ func TestLameDuckModeInfo(t *testing.T) {
|
||||
checkConnectURLs(expected)
|
||||
|
||||
optsC := testWSOptions()
|
||||
testSetLDMGracePeriod(optsA, 5*time.Second)
|
||||
optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
|
||||
srvC := RunServer(optsC)
|
||||
defer srvC.Shutdown()
|
||||
|
||||
Reference in New Issue
Block a user