From cd6d71deaa1e8a9456feed5458fbdb4898d8a452 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 8 Jun 2020 11:43:25 -0600 Subject: [PATCH] [ADDED] lame_duck_grace_period option The grace period used to be hardcoded at 10 seconds. This option allows the user to configure the amount of time the server will wait before initiating the closing of client connections. Note that the grace period needs to be strictly lower than the overall lame_duck_duration. The server deducts the grace period from that overall duration and spreads the closing of connections during that time. For instance, if there are 1000 connections and the lame duck duration is set to 30 seconds and grace period to 10, then the server will use 30-10 = 20 seconds to spread the closing of those 1000 connections, so say roughly 50 clients per second. Resolves #1459. Signed-off-by: Ivan Kozlovic --- server/config_check_test.go | 31 +++++++++++++++++++++++++-- server/const.go | 4 ++++ server/opts.go | 17 +++++++++++++++ server/opts_test.go | 29 ++++++++++++------------- server/server.go | 24 +++++++++++++-------- server/server_test.go | 42 +++++++++++++++++++++++++++---------- 6 files changed, 111 insertions(+), 36 deletions(-) diff --git a/server/config_check_test.go b/server/config_check_test.go index 0485c613..63ef54ef 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1090,11 +1090,38 @@ func TestConfigCheck(t *testing.T) { { name: "invalid lame_duck_duration type", config: ` - lame_duck_duration: abc + lame_duck_duration: abc `, err: errors.New(`error parsing lame_duck_duration: time: invalid duration abc`), errorLine: 2, - errorPos: 3, + errorPos: 5, + }, + { + name: "lame_duck_duration too small", + config: ` + lame_duck_duration: "5s" + `, + err: errors.New(`invalid lame_duck_duration of 5s, minimum is 30 seconds`), + errorLine: 2, + errorPos: 5, + }, + { + name: "invalid lame_duck_grace_period type", + config: ` + lame_duck_grace_period: abc + `, + err: errors.New(`error parsing lame_duck_grace_period: time: invalid duration abc`), + errorLine: 2, + errorPos: 5, + }, + { + name: "lame_duck_grace_period should be positive", + config: ` + lame_duck_grace_period: "-5s" + `, + err: errors.New(`invalid lame_duck_grace_period, needs to be positive`), + errorLine: 2, + errorPos: 5, }, { name: "when only setting TLS timeout for a leafnode remote", diff --git a/server/const.go b/server/const.go index 35a8c853..f590a3cc 100644 --- a/server/const.go +++ b/server/const.go @@ -146,6 +146,10 @@ const ( // the closing of clients when signaled to go in lame duck mode. DEFAULT_LAME_DUCK_DURATION = 2 * time.Minute + // DEFAULT_LAME_DUCK_GRACE_PERIOD is the duration the server waits, after entering + // lame duck mode, before starting closing client connections. + DEFAULT_LAME_DUCK_GRACE_PERIOD = 10 * time.Second + // DEFAULT_LEAFNODE_INFO_WAIT Route dial timeout. DEFAULT_LEAFNODE_INFO_WAIT = 1 * time.Second diff --git a/server/opts.go b/server/opts.go index 0ee18590..f747af1f 100644 --- a/server/opts.go +++ b/server/opts.go @@ -211,6 +211,7 @@ type Options struct { WriteDeadline time.Duration `json:"-"` MaxClosedClients int `json:"-"` LameDuckDuration time.Duration `json:"-"` + LameDuckGracePeriod time.Duration `json:"-"` // MaxTracedMsgLen is the maximum printable length for traced messages. MaxTracedMsgLen int `json:"-"` @@ -729,6 +730,19 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error return } o.LameDuckDuration = dur + case "lame_duck_grace_period": + dur, err := time.ParseDuration(v.(string)) + if err != nil { + err := &configErr{tk, fmt.Sprintf("error parsing lame_duck_grace_period: %v", err)} + *errors = append(*errors, err) + return + } + if dur < 0 { + err := &configErr{tk, "invalid lame_duck_grace_period, needs to be positive"} + *errors = append(*errors, err) + return + } + o.LameDuckGracePeriod = dur case "operator", "operators", "roots", "root", "root_operators", "root_operator": opFiles := []string{} switch v := v.(type) { @@ -3408,6 +3422,9 @@ func setBaselineOptions(opts *Options) { if opts.LameDuckDuration == 0 { opts.LameDuckDuration = DEFAULT_LAME_DUCK_DURATION } + if opts.LameDuckGracePeriod == 0 { + opts.LameDuckGracePeriod = DEFAULT_LAME_DUCK_GRACE_PERIOD + } if opts.Gateway.Port != 0 { if opts.Gateway.Host == "" { opts.Gateway.Host = DEFAULT_HOST diff --git a/server/opts_test.go b/server/opts_test.go index 00140da7..d257b145 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -45,20 +45,21 @@ func checkOptionsEqual(t *testing.T, golden, opts *Options) { func TestDefaultOptions(t *testing.T) { golden := &Options{ - Host: DEFAULT_HOST, - Port: DEFAULT_PORT, - MaxConn: DEFAULT_MAX_CONNECTIONS, - HTTPHost: DEFAULT_HOST, - PingInterval: DEFAULT_PING_INTERVAL, - MaxPingsOut: DEFAULT_PING_MAX_OUT, - TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second), - AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), - MaxControlLine: MAX_CONTROL_LINE_SIZE, - MaxPayload: MAX_PAYLOAD_SIZE, - MaxPending: MAX_PENDING_SIZE, - WriteDeadline: DEFAULT_FLUSH_DEADLINE, - MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS, - LameDuckDuration: DEFAULT_LAME_DUCK_DURATION, + Host: DEFAULT_HOST, + Port: DEFAULT_PORT, + MaxConn: DEFAULT_MAX_CONNECTIONS, + HTTPHost: DEFAULT_HOST, + PingInterval: DEFAULT_PING_INTERVAL, + MaxPingsOut: DEFAULT_PING_MAX_OUT, + TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second), + AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), + MaxControlLine: MAX_CONTROL_LINE_SIZE, + MaxPayload: MAX_PAYLOAD_SIZE, + MaxPending: MAX_PENDING_SIZE, + WriteDeadline: DEFAULT_FLUSH_DEADLINE, + MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS, + LameDuckDuration: DEFAULT_LAME_DUCK_DURATION, + LameDuckGracePeriod: DEFAULT_LAME_DUCK_GRACE_PERIOD, LeafNode: LeafNodeOpts{ ReconnectInterval: DEFAULT_LEAF_NODE_RECONNECT, }, diff --git a/server/server.go b/server/server.go index 50fe1046..47757339 100644 --- a/server/server.go +++ b/server/server.go @@ -45,9 +45,6 @@ import ( ) const ( - // Time to wait before starting closing clients when in LD mode. - lameDuckModeDefaultInitialDelay = int64(10 * time.Second) - // Interval for the first PING for non client connections. firstPingInterval = time.Second @@ -55,9 +52,6 @@ const ( firstClientPingInterval = 2 * time.Second ) -// Make this a variable so that we can change during tests -var lameDuckModeInitialDelay = int64(lameDuckModeDefaultInitialDelay) - // Info is the information sent to clients, routes, gateways, and leaf nodes, // to help them understand information about this server. type Info struct { @@ -427,6 +421,10 @@ func (s *Server) ClientURL() string { } func validateOptions(o *Options) error { + if o.LameDuckDuration > 0 && o.LameDuckGracePeriod >= o.LameDuckDuration { + return fmt.Errorf("lame duck grace period (%v) should be strictly lower than lame duck duration (%v)", + o.LameDuckGracePeriod, o.LameDuckDuration) + } // Check that the trust configuration is correct. if err := validateTrustedOperators(o); err != nil { return err @@ -2716,6 +2714,14 @@ func (s *Server) lameDuckMode() { s.websocket.listener = nil } s.ldmCh = make(chan bool, expected) + opts := s.getOpts() + gp := opts.LameDuckGracePeriod + // For tests, we want the grace period to be in some cases bigger + // than the ldm duration, so to by-pass the validateOptions() check, + // we use negative number and flip it here. + if gp < 0 { + gp *= -1 + } s.mu.Unlock() // Wait for accept loops to be done to make sure that no new @@ -2734,8 +2740,8 @@ func (s *Server) lameDuckMode() { s.Shutdown() return } - dur := int64(s.getOpts().LameDuckDuration) - dur -= atomic.LoadInt64(&lameDuckModeInitialDelay) + dur := int64(opts.LameDuckDuration) + dur -= int64(gp) if dur <= 0 { dur = int64(time.Second) } @@ -2767,7 +2773,7 @@ func (s *Server) lameDuckMode() { s.sendLDMToClients() s.mu.Unlock() - t := time.NewTimer(time.Duration(atomic.LoadInt64(&lameDuckModeInitialDelay))) + t := time.NewTimer(gp) // Delay start of closing of client connections in case // we have several servers that we want to signal to enter LD mode // and not have their client reconnect to each other. diff --git a/server/server_test.go b/server/server_test.go index c3e805a1..97f6ee08 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -665,11 +665,30 @@ func TestProfilingNoTimeout(t *testing.T) { } } -func TestLameDuckMode(t *testing.T) { - atomic.StoreInt64(&lameDuckModeInitialDelay, 0) - defer atomic.StoreInt64(&lameDuckModeInitialDelay, lameDuckModeDefaultInitialDelay) +func TestLameDuckOptionsValidation(t *testing.T) { + o := DefaultOptions() + o.LameDuckDuration = 5 * time.Second + o.LameDuckGracePeriod = 10 * time.Second + s, err := NewServer(o) + if s != nil { + s.Shutdown() + } + if err == nil || !strings.Contains(err.Error(), "should be strictly lower") { + t.Fatalf("Expected error saying that ldm grace period should be lower than ldm duration, got %v", err) + } +} +func testSetLDMGracePeriod(o *Options, val time.Duration) { + // For tests, we set the grace period as a negative value + // so we can have a grace period bigger than the total duration. + // When validating options, we would not be able to run the + // server without this trick. + o.LameDuckGracePeriod = val * -1 +} + +func TestLameDuckMode(t *testing.T) { optsA := DefaultOptions() + testSetLDMGracePeriod(optsA, time.Nanosecond) optsA.Cluster.Host = "127.0.0.1" srvA := RunServer(optsA) defer srvA.Shutdown() @@ -830,19 +849,21 @@ func TestLameDuckMode(t *testing.T) { // Now test that we introduce delay before starting closing client connections. // This allow to "signal" multiple servers and avoid their clients to reconnect // to a server that is going to be going in LD mode. - atomic.StoreInt64(&lameDuckModeInitialDelay, int64(100*time.Millisecond)) - + testSetLDMGracePeriod(optsA, 100*time.Millisecond) optsA.LameDuckDuration = 10 * time.Millisecond srvA = RunServer(optsA) defer srvA.Shutdown() optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port)) + testSetLDMGracePeriod(optsB, 100*time.Millisecond) optsB.LameDuckDuration = 10 * time.Millisecond srvB = RunServer(optsB) defer srvB.Shutdown() optsC := DefaultOptions() optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port)) + testSetLDMGracePeriod(optsC, 100*time.Millisecond) + optsC.LameDuckGracePeriod = -100 * time.Millisecond optsC.LameDuckDuration = 10 * time.Millisecond srvC := RunServer(optsC) defer srvC.Shutdown() @@ -878,15 +899,13 @@ func TestLameDuckMode(t *testing.T) { } func TestLameDuckModeInfo(t *testing.T) { - // Ensure that initial delay is set very high so that we can - // check that some events occur as expected before the client - // is disconnected. - atomic.StoreInt64(&lameDuckModeInitialDelay, int64(5*time.Second)) - defer atomic.StoreInt64(&lameDuckModeInitialDelay, lameDuckModeDefaultInitialDelay) - optsA := testWSOptions() optsA.Cluster.Host = "127.0.0.1" optsA.Cluster.Port = -1 + // Ensure that initial delay is set very high so that we can + // check that some events occur as expected before the client + // is disconnected. + testSetLDMGracePeriod(optsA, 5*time.Second) optsA.LameDuckDuration = 50 * time.Millisecond optsA.DisableShortFirstPing = true srvA := RunServer(optsA) @@ -954,6 +973,7 @@ func TestLameDuckModeInfo(t *testing.T) { checkConnectURLs(expected) optsC := testWSOptions() + testSetLDMGracePeriod(optsA, 5*time.Second) optsC.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port)) srvC := RunServer(optsC) defer srvC.Shutdown()