Added support for lame duck mode

When receiving SIGUSR2 signal (or -sl ldm) the server stops
accepting new clients, closes routes connections and spread the
closing of client connections based on a config lame duck duration
(default is 30sec). This will help preventing a storm of client
reconnect when a server needs to be shutdown.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2018-10-19 19:07:37 -06:00
parent 8dfce4b3d7
commit 0067c3bb04
14 changed files with 322 additions and 32 deletions

View File

@@ -128,6 +128,7 @@ const (
DuplicateRoute
RouteRemoved
ServerShutdown
LameDuckMode
)
type client struct {
@@ -2103,8 +2104,8 @@ func (c *client) closeConnection(reason ClosedState) {
c.sl.RemoveBatch(subs)
if srv != nil {
// This is a route that disconnected...
if len(connectURLs) > 0 {
// This is a route that disconnected, but we are not in lame duck mode...
if len(connectURLs) > 0 && !srv.isLameDuckMode() {
// Unless disabled, possibly update the server's INFO protocol
// and send to clients that know how to handle async INFOs.
if !srv.getOpts().Cluster.NoAdvertise {

View File

@@ -1060,6 +1060,15 @@ func TestConfigCheck(t *testing.T) {
errorLine: 3,
errorPos: 5,
},
{
name: "invalid lame_duck_duration type",
config: `
lame_duck_duration: abc
`,
err: errors.New(`error parsing lame_duck_duration: time: invalid duration abc`),
errorLine: 2,
errorPos: 3,
},
}
checkConfig := func(config string) error {

View File

@@ -44,3 +44,5 @@ ping_max: 3
# how long server can block on a socket write to a client
write_deadline: "3s"
lame_duck_duration: "4s"

View File

@@ -26,6 +26,9 @@ const (
CommandQuit = Command("quit")
CommandReopen = Command("reopen")
CommandReload = Command("reload")
// private for now
commandLDMode = Command("ldm")
)
var (
@@ -127,4 +130,8 @@ const (
// DEFAULT_TTL_AE_RESPONSE_MAP
DEFAULT_TTL_AE_RESPONSE_MAP = 10 * time.Minute
// DEFAULT_LAME_DUCK_DURATION is the time in which the server spreads
// the closing of clients when signaled to go in lame duck mode.
DEFAULT_LAME_DUCK_DURATION = 30 * time.Second
)

View File

@@ -395,8 +395,8 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time) {
func (c *client) getRTT() string {
if c.rtt == 0 {
// If a real client, go ahead and send ping now to get a value
// for RTT. For tests and telnet, etc skip.
if c.flags.isSet(connectReceived) && c.opts.Lang != "" {
// for RTT. For tests and telnet, or if client is closing, etc skip.
if !c.flags.isSet(clearConnection) && c.flags.isSet(connectReceived) && c.opts.Lang != "" {
c.sendPing()
}
return ""
@@ -1038,6 +1038,8 @@ func (reason ClosedState) String() string {
return "Route Removed"
case ServerShutdown:
return "Server Shutdown"
case LameDuckMode:
return "Lame Duck Mode"
}
return "Unknown State"
}

View File

@@ -95,6 +95,7 @@ type Options struct {
WriteDeadline time.Duration `json:"-"`
RQSubsSweep time.Duration `json:"-"`
MaxClosedClients int `json:"-"`
LameDuckDuration time.Duration `json:"-"`
CustomClientAuthentication Authentication `json:"-"`
CustomRouterAuthentication Authentication `json:"-"`
@@ -409,6 +410,14 @@ func (o *Options) ProcessConfigFile(configFile string) error {
}
warnings = append(warnings, err)
}
case "lame_duck_duration":
dur, err := time.ParseDuration(v.(string))
if err != nil {
err := &configErr{tk, fmt.Sprintf("error parsing lame_duck_duration: %v", err)}
errors = append(errors, err)
continue
}
o.LameDuckDuration = dur
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
@@ -1782,6 +1791,9 @@ func processOptions(opts *Options) {
if opts.MaxClosedClients == 0 {
opts.MaxClosedClients = DEFAULT_MAX_CLOSED_CLIENTS
}
if opts.LameDuckDuration == 0 {
opts.LameDuckDuration = DEFAULT_LAME_DUCK_DURATION
}
}
// ConfigureOptions accepts a flag set and augment it with NATS Server

View File

@@ -43,6 +43,7 @@ func TestDefaultOptions(t *testing.T) {
WriteDeadline: DEFAULT_FLUSH_DEADLINE,
RQSubsSweep: DEFAULT_REMOTE_QSUBS_SWEEPER,
MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS,
LameDuckDuration: DEFAULT_LAME_DUCK_DURATION,
}
opts := &Options{}
@@ -66,28 +67,29 @@ func TestOptions_RandomPort(t *testing.T) {
func TestConfigFile(t *testing.T) {
golden := &Options{
ConfigFile: "./configs/test.conf",
Host: "127.0.0.1",
Port: 4242,
Username: "derek",
Password: "porkchop",
AuthTimeout: 1.0,
Debug: false,
Trace: true,
Logtime: false,
HTTPPort: 8222,
PidFile: "/tmp/gnatsd.pid",
ProfPort: 6543,
Syslog: true,
RemoteSyslog: "udp://foo.com:33",
MaxControlLine: 2048,
MaxPayload: 65536,
MaxConn: 100,
MaxSubs: 1000,
MaxPending: 10000000,
PingInterval: 60 * time.Second,
MaxPingsOut: 3,
WriteDeadline: 3 * time.Second,
ConfigFile: "./configs/test.conf",
Host: "127.0.0.1",
Port: 4242,
Username: "derek",
Password: "porkchop",
AuthTimeout: 1.0,
Debug: false,
Trace: true,
Logtime: false,
HTTPPort: 8222,
PidFile: "/tmp/gnatsd.pid",
ProfPort: 6543,
Syslog: true,
RemoteSyslog: "udp://foo.com:33",
MaxControlLine: 2048,
MaxPayload: 65536,
MaxConn: 100,
MaxSubs: 1000,
MaxPending: 10000000,
PingInterval: 60 * time.Second,
MaxPingsOut: 3,
WriteDeadline: 3 * time.Second,
LameDuckDuration: 4 * time.Second,
}
opts, err := ProcessConfigFile("./configs/test.conf")
@@ -248,7 +250,8 @@ func TestMergeOverrides(t *testing.T) {
NoAdvertise: true,
ConnectRetries: 2,
},
WriteDeadline: 3 * time.Second,
WriteDeadline: 3 * time.Second,
LameDuckDuration: 4 * time.Second,
}
fopts, err := ProcessConfigFile("./configs/test.conf")
if err != nil {

View File

@@ -1139,6 +1139,10 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
for s.isRunning() {
conn, err := l.Accept()
if err != nil {
if s.isLameDuckMode() {
s.ldmCh <- true
return
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
s.Debugf("Temporary Route Accept Errorf(%v), sleeping %dms",
ne, tmpDelay/time.Millisecond)

View File

@@ -136,6 +136,11 @@ type Server struct {
// not set any timeout.
monitoringServer *http.Server
profilingServer *http.Server
// LameDuck mode
ldm bool
ldmCh chan bool
ldmDoneCh chan bool
}
// Make sure all are 64bits for atomic use
@@ -594,6 +599,15 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
for s.isRunning() {
conn, err := l.Accept()
if err != nil {
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// We need to wait for lameDuckMode() to notify us that it
// is done with closing clients. If we return before that,
// the s.Start() function would return and process would exit.
<-s.ldmDoneCh
return
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
s.Errorf("Temporary Client Accept Error (%v), sleeping %dms",
ne, tmpDelay/time.Millisecond)
@@ -887,8 +901,8 @@ func (s *Server) createClient(conn net.Conn) *client {
// If server is not running, Shutdown() may have already gathered the
// list of connections to close. It won't contain this one, so we need
// to bail out now otherwise the readLoop started down there would not
// be interrupted.
if !s.running {
// be interrupted. Skip also if in lame duck mode.
if !s.running || s.ldm {
s.mu.Unlock()
return c
}
@@ -1540,3 +1554,98 @@ func (s *Server) serviceListeners() []net.Listener {
}
return listeners
}
// Returns true if in lame duck mode.
func (s *Server) isLameDuckMode() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.ldm
}
// This function will close client and route listeners
// then close clients at some interval to avoid a reconnecting storm.
// Note that the routes are closed before so that when a client is
// disconnected and reconnects to another server, that server does
// not send to this server a route SUB.
func (s *Server) lameDuckMode() {
s.mu.Lock()
// If we are in the process (or done) or if listener is not yet
// ready, just return.
if s.ldm || s.listener == nil {
s.mu.Unlock()
return
}
expected := 1
s.Noticef("Entering lame duck mode, stop accepting new clients")
s.ldm = true
s.ldmCh = make(chan bool, 1)
s.ldmDoneCh = make(chan bool, 1)
s.listener.Close()
s.listener = nil
if s.routeListener != nil {
s.routeListener.Close()
s.routeListener = nil
expected++
}
s.mu.Unlock()
// Wait for accept loop to be done to make sure that no new
// client can connect
for i := 0; i < expected; i++ {
<-s.ldmCh
}
durInMs := int64(s.getOpts().LameDuckDuration / time.Millisecond)
if durInMs < 1 {
durInMs = 1
}
// Close routes
s.mu.Lock()
routes := make([]*client, 0, len(s.routes))
for _, r := range s.routes {
routes = append(routes, r)
}
s.mu.Unlock()
for _, r := range routes {
r.setRouteNoReconnectOnClose()
r.closeConnection(LameDuckMode)
}
// Now capture all clients
s.mu.Lock()
numClients := int64(len(s.clients))
sd := int64(0)
pa := 0
if numClients < durInMs {
sd = (durInMs / numClients) * int64(time.Millisecond)
} else {
// There are more clients than granted milliseconds.
// We will still sleep 10ms from time to time. We will
// close by batch.
sd = int64(10 * time.Millisecond)
pa = int(numClients / durInMs)
}
clients := make([]*client, len(s.clients))
i := 0
for _, client := range s.clients {
clients[i] = client
i++
}
s.mu.Unlock()
s.Noticef("Closing existing clients")
for i, client := range clients {
client.closeConnection(LameDuckMode)
if sd > 0 || (i > 0 && i%pa == 0) {
time.Sleep(time.Duration(rand.Int63n(sd)))
}
}
s.Noticef("Lame duck mode ending")
// Release the AcceptLoop. If we don't have the AcceptLoop
// block, the process will exit as soon as s.Start() returns,
// which it would when closing the listener
s.ldmDoneCh <- true
s.Shutdown()
}

View File

@@ -19,6 +19,7 @@ import (
"net"
"os"
"strings"
"sync"
"testing"
"time"
@@ -698,3 +699,108 @@ func TestProfilingNoTimeout(t *testing.T) {
t.Fatalf("WriteTimeout should not be set, was set to %v", srv.WriteTimeout)
}
}
func TestLameDuckMode(t *testing.T) {
optsA := DefaultOptions()
optsA.Cluster.Host = "127.0.0.1"
optsA.LameDuckDuration = 10 * time.Millisecond
srvA := RunServer(optsA)
defer srvA.Shutdown()
optsB := DefaultOptions()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB := RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
total := 50
connectClients := func() []*nats.Conn {
ncs := make([]*nats.Conn, 0, total)
for i := 0; i < total; i++ {
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port),
nats.ReconnectWait(50*time.Millisecond))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
ncs = append(ncs, nc)
}
return ncs
}
ncs := connectClients()
checkClientsCount(t, srvA, total)
checkClientsCount(t, srvB, 0)
start := time.Now()
srvA.lameDuckMode()
// Make sure that nothing bad happens if called twice
srvA.lameDuckMode()
elapsed := time.Since(start)
// It should have taken more than the allotted time of 10ms since we had 50 clients.
if elapsed <= optsA.LameDuckDuration {
t.Fatalf("Expected to take more than %v, got %v", optsA.LameDuckDuration, elapsed)
}
checkClientsCount(t, srvA, 0)
checkClientsCount(t, srvB, total)
// Check closed status on server A
cz := pollConz(t, srvA, 1, "", &ConnzOptions{State: ConnClosed})
if n := len(cz.Conns); n != total {
t.Fatalf("Expected %v closed connections, got %v", total, n)
}
for _, c := range cz.Conns {
checkReason(t, c.Reason, LameDuckMode)
}
for _, nc := range ncs {
nc.Close()
}
srvB.Shutdown()
optsA.LameDuckDuration = 1000 * time.Millisecond
srvA = RunServer(optsA)
defer srvA.Shutdown()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB = RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
ncs = connectClients()
checkClientsCount(t, srvA, total)
checkClientsCount(t, srvB, 0)
wg := sync.WaitGroup{}
wg.Add(1)
start = time.Now()
go func() {
srvA.lameDuckMode()
wg.Done()
}()
// Check that while in lameDuckMode, it is not possible to connect
// to the server.
for !srvA.isLameDuckMode() {
time.Sleep(15 * time.Millisecond)
}
if _, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)); err != nats.ErrNoServers {
t.Fatalf("Expected %v, got %v", nats.ErrNoServers, err)
}
wg.Wait()
elapsed = time.Since(start)
checkClientsCount(t, srvA, 0)
checkClientsCount(t, srvB, total)
if elapsed > optsA.LameDuckDuration {
t.Fatalf("Expected to not take more than %v, got %v", optsA.LameDuckDuration, elapsed)
}
for _, nc := range ncs {
nc.Close()
}
}

View File

@@ -24,6 +24,8 @@ import (
const (
reopenLogCode = 128
reopenLogCmd = svc.Cmd(reopenLogCode)
ldmCode = 129
ldmCmd = svc.Cmd(ldmCode)
acceptReopenLog = svc.Accepted(reopenLogCode)
)
@@ -87,6 +89,8 @@ loop:
case reopenLogCmd:
// File log re-open for rotating file logs.
w.server.ReOpenLogFile()
case ldmCmd:
w.server.lameDuckMode()
case svc.ParamChange:
if err := w.server.Reload(); err != nil {
w.server.Errorf("Failed to reload server configuration: %s", err)

View File

@@ -40,11 +40,9 @@ func (s *Server) handleSignals() {
}
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGHUP)
signal.Notify(c, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGHUP)
s.grWG.Add(1)
go func() {
defer s.grWG.Done()
for {
select {
case sig := <-c:
@@ -56,6 +54,8 @@ func (s *Server) handleSignals() {
case syscall.SIGUSR1:
// File log re-open for rotating file logs.
s.ReOpenLogFile()
case syscall.SIGUSR2:
s.lameDuckMode()
case syscall.SIGHUP:
// Config reload.
if err := s.Reload(); err != nil {
@@ -111,6 +111,8 @@ func ProcessSignal(command Command, pidStr string) error {
err = kill(pid, syscall.SIGUSR1)
case CommandReload:
err = kill(pid, syscall.SIGHUP)
case commandLDMode:
err = kill(pid, syscall.SIGUSR2)
default:
err = fmt.Errorf("unknown signal %q", command)
}

View File

@@ -312,3 +312,29 @@ func TestProcessSignalReloadProcess(t *testing.T) {
t.Fatal("Expected kill to be called")
}
}
func TestProcessSignalLameDuckMode(t *testing.T) {
killBefore := kill
called := false
kill = func(pid int, signal syscall.Signal) error {
called = true
if pid != 123 {
t.Fatalf("pid is incorrect.\nexpected: 123\ngot: %d", pid)
}
if signal != syscall.SIGUSR2 {
t.Fatalf("signal is incorrect.\nexpected: sigusr2\ngot: %v", signal)
}
return nil
}
defer func() {
kill = killBefore
}()
if err := ProcessSignal(commandLDMode, "123"); err != nil {
t.Fatalf("ProcessSignal failed: %v", err)
}
if !called {
t.Fatal("Expected kill to be called")
}
}

View File

@@ -76,6 +76,9 @@ func ProcessSignal(command Command, service string) error {
case CommandReload:
cmd = svc.ParamChange
to = svc.Running
case commandLDMode:
cmd = ldmCmd
to = svc.Running
default:
return fmt.Errorf("unknown signal %q", command)
}