From 0067c3bb042ba98366338994ad837e54b9648cac Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 19 Oct 2018 19:07:37 -0600 Subject: [PATCH] Added support for lame duck mode When receiving SIGUSR2 signal (or -sl ldm) the server stops accepting new clients, closes routes connections and spread the closing of client connections based on a config lame duck duration (default is 30sec). This will help preventing a storm of client reconnect when a server needs to be shutdown. Signed-off-by: Ivan Kozlovic --- server/client.go | 5 +- server/config_check_test.go | 9 +++ server/configs/test.conf | 2 + server/const.go | 7 +++ server/monitor.go | 6 +- server/opts.go | 12 ++++ server/opts_test.go | 49 ++++++++-------- server/route.go | 4 ++ server/server.go | 113 +++++++++++++++++++++++++++++++++++- server/server_test.go | 106 +++++++++++++++++++++++++++++++++ server/service_windows.go | 4 ++ server/signal.go | 8 ++- server/signal_test.go | 26 +++++++++ server/signal_windows.go | 3 + 14 files changed, 322 insertions(+), 32 deletions(-) diff --git a/server/client.go b/server/client.go index 436c26a4..0400e13e 100644 --- a/server/client.go +++ b/server/client.go @@ -128,6 +128,7 @@ const ( DuplicateRoute RouteRemoved ServerShutdown + LameDuckMode ) type client struct { @@ -2103,8 +2104,8 @@ func (c *client) closeConnection(reason ClosedState) { c.sl.RemoveBatch(subs) if srv != nil { - // This is a route that disconnected... - if len(connectURLs) > 0 { + // This is a route that disconnected, but we are not in lame duck mode... + if len(connectURLs) > 0 && !srv.isLameDuckMode() { // Unless disabled, possibly update the server's INFO protocol // and send to clients that know how to handle async INFOs. if !srv.getOpts().Cluster.NoAdvertise { diff --git a/server/config_check_test.go b/server/config_check_test.go index 782072e7..61aa5561 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1060,6 +1060,15 @@ func TestConfigCheck(t *testing.T) { errorLine: 3, errorPos: 5, }, + { + name: "invalid lame_duck_duration type", + config: ` + lame_duck_duration: abc + `, + err: errors.New(`error parsing lame_duck_duration: time: invalid duration abc`), + errorLine: 2, + errorPos: 3, + }, } checkConfig := func(config string) error { diff --git a/server/configs/test.conf b/server/configs/test.conf index 01c75f00..21e420eb 100644 --- a/server/configs/test.conf +++ b/server/configs/test.conf @@ -44,3 +44,5 @@ ping_max: 3 # how long server can block on a socket write to a client write_deadline: "3s" + +lame_duck_duration: "4s" \ No newline at end of file diff --git a/server/const.go b/server/const.go index 4e67ad31..f807fc48 100644 --- a/server/const.go +++ b/server/const.go @@ -26,6 +26,9 @@ const ( CommandQuit = Command("quit") CommandReopen = Command("reopen") CommandReload = Command("reload") + + // private for now + commandLDMode = Command("ldm") ) var ( @@ -127,4 +130,8 @@ const ( // DEFAULT_TTL_AE_RESPONSE_MAP DEFAULT_TTL_AE_RESPONSE_MAP = 10 * time.Minute + + // DEFAULT_LAME_DUCK_DURATION is the time in which the server spreads + // the closing of clients when signaled to go in lame duck mode. + DEFAULT_LAME_DUCK_DURATION = 30 * time.Second ) diff --git a/server/monitor.go b/server/monitor.go index add7f1c6..4928f38b 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -395,8 +395,8 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time) { func (c *client) getRTT() string { if c.rtt == 0 { // If a real client, go ahead and send ping now to get a value - // for RTT. For tests and telnet, etc skip. - if c.flags.isSet(connectReceived) && c.opts.Lang != "" { + // for RTT. For tests and telnet, or if client is closing, etc skip. + if !c.flags.isSet(clearConnection) && c.flags.isSet(connectReceived) && c.opts.Lang != "" { c.sendPing() } return "" @@ -1038,6 +1038,8 @@ func (reason ClosedState) String() string { return "Route Removed" case ServerShutdown: return "Server Shutdown" + case LameDuckMode: + return "Lame Duck Mode" } return "Unknown State" } diff --git a/server/opts.go b/server/opts.go index f0171d5e..4ce93479 100644 --- a/server/opts.go +++ b/server/opts.go @@ -95,6 +95,7 @@ type Options struct { WriteDeadline time.Duration `json:"-"` RQSubsSweep time.Duration `json:"-"` MaxClosedClients int `json:"-"` + LameDuckDuration time.Duration `json:"-"` CustomClientAuthentication Authentication `json:"-"` CustomRouterAuthentication Authentication `json:"-"` @@ -409,6 +410,14 @@ func (o *Options) ProcessConfigFile(configFile string) error { } warnings = append(warnings, err) } + case "lame_duck_duration": + dur, err := time.ParseDuration(v.(string)) + if err != nil { + err := &configErr{tk, fmt.Sprintf("error parsing lame_duck_duration: %v", err)} + errors = append(errors, err) + continue + } + o.LameDuckDuration = dur default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -1782,6 +1791,9 @@ func processOptions(opts *Options) { if opts.MaxClosedClients == 0 { opts.MaxClosedClients = DEFAULT_MAX_CLOSED_CLIENTS } + if opts.LameDuckDuration == 0 { + opts.LameDuckDuration = DEFAULT_LAME_DUCK_DURATION + } } // ConfigureOptions accepts a flag set and augment it with NATS Server diff --git a/server/opts_test.go b/server/opts_test.go index fd5d5ce8..435d965a 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -43,6 +43,7 @@ func TestDefaultOptions(t *testing.T) { WriteDeadline: DEFAULT_FLUSH_DEADLINE, RQSubsSweep: DEFAULT_REMOTE_QSUBS_SWEEPER, MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS, + LameDuckDuration: DEFAULT_LAME_DUCK_DURATION, } opts := &Options{} @@ -66,28 +67,29 @@ func TestOptions_RandomPort(t *testing.T) { func TestConfigFile(t *testing.T) { golden := &Options{ - ConfigFile: "./configs/test.conf", - Host: "127.0.0.1", - Port: 4242, - Username: "derek", - Password: "porkchop", - AuthTimeout: 1.0, - Debug: false, - Trace: true, - Logtime: false, - HTTPPort: 8222, - PidFile: "/tmp/gnatsd.pid", - ProfPort: 6543, - Syslog: true, - RemoteSyslog: "udp://foo.com:33", - MaxControlLine: 2048, - MaxPayload: 65536, - MaxConn: 100, - MaxSubs: 1000, - MaxPending: 10000000, - PingInterval: 60 * time.Second, - MaxPingsOut: 3, - WriteDeadline: 3 * time.Second, + ConfigFile: "./configs/test.conf", + Host: "127.0.0.1", + Port: 4242, + Username: "derek", + Password: "porkchop", + AuthTimeout: 1.0, + Debug: false, + Trace: true, + Logtime: false, + HTTPPort: 8222, + PidFile: "/tmp/gnatsd.pid", + ProfPort: 6543, + Syslog: true, + RemoteSyslog: "udp://foo.com:33", + MaxControlLine: 2048, + MaxPayload: 65536, + MaxConn: 100, + MaxSubs: 1000, + MaxPending: 10000000, + PingInterval: 60 * time.Second, + MaxPingsOut: 3, + WriteDeadline: 3 * time.Second, + LameDuckDuration: 4 * time.Second, } opts, err := ProcessConfigFile("./configs/test.conf") @@ -248,7 +250,8 @@ func TestMergeOverrides(t *testing.T) { NoAdvertise: true, ConnectRetries: 2, }, - WriteDeadline: 3 * time.Second, + WriteDeadline: 3 * time.Second, + LameDuckDuration: 4 * time.Second, } fopts, err := ProcessConfigFile("./configs/test.conf") if err != nil { diff --git a/server/route.go b/server/route.go index 7e82d627..ba8d736b 100644 --- a/server/route.go +++ b/server/route.go @@ -1139,6 +1139,10 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { for s.isRunning() { conn, err := l.Accept() if err != nil { + if s.isLameDuckMode() { + s.ldmCh <- true + return + } if ne, ok := err.(net.Error); ok && ne.Temporary() { s.Debugf("Temporary Route Accept Errorf(%v), sleeping %dms", ne, tmpDelay/time.Millisecond) diff --git a/server/server.go b/server/server.go index 0b30b05b..45235e9c 100644 --- a/server/server.go +++ b/server/server.go @@ -136,6 +136,11 @@ type Server struct { // not set any timeout. monitoringServer *http.Server profilingServer *http.Server + + // LameDuck mode + ldm bool + ldmCh chan bool + ldmDoneCh chan bool } // Make sure all are 64bits for atomic use @@ -594,6 +599,15 @@ func (s *Server) AcceptLoop(clr chan struct{}) { for s.isRunning() { conn, err := l.Accept() if err != nil { + if s.isLameDuckMode() { + // Signal that we are not accepting new clients + s.ldmCh <- true + // We need to wait for lameDuckMode() to notify us that it + // is done with closing clients. If we return before that, + // the s.Start() function would return and process would exit. + <-s.ldmDoneCh + return + } if ne, ok := err.(net.Error); ok && ne.Temporary() { s.Errorf("Temporary Client Accept Error (%v), sleeping %dms", ne, tmpDelay/time.Millisecond) @@ -887,8 +901,8 @@ func (s *Server) createClient(conn net.Conn) *client { // If server is not running, Shutdown() may have already gathered the // list of connections to close. It won't contain this one, so we need // to bail out now otherwise the readLoop started down there would not - // be interrupted. - if !s.running { + // be interrupted. Skip also if in lame duck mode. + if !s.running || s.ldm { s.mu.Unlock() return c } @@ -1540,3 +1554,98 @@ func (s *Server) serviceListeners() []net.Listener { } return listeners } + +// Returns true if in lame duck mode. +func (s *Server) isLameDuckMode() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.ldm +} + +// This function will close client and route listeners +// then close clients at some interval to avoid a reconnecting storm. +// Note that the routes are closed before so that when a client is +// disconnected and reconnects to another server, that server does +// not send to this server a route SUB. +func (s *Server) lameDuckMode() { + s.mu.Lock() + // If we are in the process (or done) or if listener is not yet + // ready, just return. + if s.ldm || s.listener == nil { + s.mu.Unlock() + return + } + expected := 1 + s.Noticef("Entering lame duck mode, stop accepting new clients") + s.ldm = true + s.ldmCh = make(chan bool, 1) + s.ldmDoneCh = make(chan bool, 1) + s.listener.Close() + s.listener = nil + if s.routeListener != nil { + s.routeListener.Close() + s.routeListener = nil + expected++ + } + s.mu.Unlock() + + // Wait for accept loop to be done to make sure that no new + // client can connect + for i := 0; i < expected; i++ { + <-s.ldmCh + } + + durInMs := int64(s.getOpts().LameDuckDuration / time.Millisecond) + if durInMs < 1 { + durInMs = 1 + } + + // Close routes + s.mu.Lock() + routes := make([]*client, 0, len(s.routes)) + for _, r := range s.routes { + routes = append(routes, r) + } + s.mu.Unlock() + for _, r := range routes { + r.setRouteNoReconnectOnClose() + r.closeConnection(LameDuckMode) + } + + // Now capture all clients + s.mu.Lock() + numClients := int64(len(s.clients)) + sd := int64(0) + pa := 0 + if numClients < durInMs { + sd = (durInMs / numClients) * int64(time.Millisecond) + } else { + // There are more clients than granted milliseconds. + // We will still sleep 10ms from time to time. We will + // close by batch. + sd = int64(10 * time.Millisecond) + pa = int(numClients / durInMs) + } + clients := make([]*client, len(s.clients)) + i := 0 + for _, client := range s.clients { + clients[i] = client + i++ + } + s.mu.Unlock() + + s.Noticef("Closing existing clients") + for i, client := range clients { + client.closeConnection(LameDuckMode) + if sd > 0 || (i > 0 && i%pa == 0) { + time.Sleep(time.Duration(rand.Int63n(sd))) + } + } + + s.Noticef("Lame duck mode ending") + // Release the AcceptLoop. If we don't have the AcceptLoop + // block, the process will exit as soon as s.Start() returns, + // which it would when closing the listener + s.ldmDoneCh <- true + s.Shutdown() +} diff --git a/server/server_test.go b/server/server_test.go index 20b8dbe2..34b36411 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -19,6 +19,7 @@ import ( "net" "os" "strings" + "sync" "testing" "time" @@ -698,3 +699,108 @@ func TestProfilingNoTimeout(t *testing.T) { t.Fatalf("WriteTimeout should not be set, was set to %v", srv.WriteTimeout) } } + +func TestLameDuckMode(t *testing.T) { + optsA := DefaultOptions() + optsA.Cluster.Host = "127.0.0.1" + optsA.LameDuckDuration = 10 * time.Millisecond + srvA := RunServer(optsA) + defer srvA.Shutdown() + + optsB := DefaultOptions() + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port)) + srvB := RunServer(optsB) + defer srvB.Shutdown() + + checkClusterFormed(t, srvA, srvB) + + total := 50 + connectClients := func() []*nats.Conn { + ncs := make([]*nats.Conn, 0, total) + for i := 0; i < total; i++ { + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port), + nats.ReconnectWait(50*time.Millisecond)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + ncs = append(ncs, nc) + } + return ncs + } + ncs := connectClients() + + checkClientsCount(t, srvA, total) + checkClientsCount(t, srvB, 0) + + start := time.Now() + srvA.lameDuckMode() + // Make sure that nothing bad happens if called twice + srvA.lameDuckMode() + elapsed := time.Since(start) + // It should have taken more than the allotted time of 10ms since we had 50 clients. + if elapsed <= optsA.LameDuckDuration { + t.Fatalf("Expected to take more than %v, got %v", optsA.LameDuckDuration, elapsed) + } + + checkClientsCount(t, srvA, 0) + checkClientsCount(t, srvB, total) + + // Check closed status on server A + cz := pollConz(t, srvA, 1, "", &ConnzOptions{State: ConnClosed}) + if n := len(cz.Conns); n != total { + t.Fatalf("Expected %v closed connections, got %v", total, n) + } + for _, c := range cz.Conns { + checkReason(t, c.Reason, LameDuckMode) + } + + for _, nc := range ncs { + nc.Close() + } + + srvB.Shutdown() + + optsA.LameDuckDuration = 1000 * time.Millisecond + srvA = RunServer(optsA) + defer srvA.Shutdown() + + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port)) + srvB = RunServer(optsB) + defer srvB.Shutdown() + + checkClusterFormed(t, srvA, srvB) + + ncs = connectClients() + + checkClientsCount(t, srvA, total) + checkClientsCount(t, srvB, 0) + + wg := sync.WaitGroup{} + wg.Add(1) + start = time.Now() + go func() { + srvA.lameDuckMode() + wg.Done() + }() + // Check that while in lameDuckMode, it is not possible to connect + // to the server. + for !srvA.isLameDuckMode() { + time.Sleep(15 * time.Millisecond) + } + if _, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)); err != nats.ErrNoServers { + t.Fatalf("Expected %v, got %v", nats.ErrNoServers, err) + } + wg.Wait() + elapsed = time.Since(start) + + checkClientsCount(t, srvA, 0) + checkClientsCount(t, srvB, total) + + if elapsed > optsA.LameDuckDuration { + t.Fatalf("Expected to not take more than %v, got %v", optsA.LameDuckDuration, elapsed) + } + + for _, nc := range ncs { + nc.Close() + } +} diff --git a/server/service_windows.go b/server/service_windows.go index 0b9fa949..fbcd854d 100644 --- a/server/service_windows.go +++ b/server/service_windows.go @@ -24,6 +24,8 @@ import ( const ( reopenLogCode = 128 reopenLogCmd = svc.Cmd(reopenLogCode) + ldmCode = 129 + ldmCmd = svc.Cmd(ldmCode) acceptReopenLog = svc.Accepted(reopenLogCode) ) @@ -87,6 +89,8 @@ loop: case reopenLogCmd: // File log re-open for rotating file logs. w.server.ReOpenLogFile() + case ldmCmd: + w.server.lameDuckMode() case svc.ParamChange: if err := w.server.Reload(); err != nil { w.server.Errorf("Failed to reload server configuration: %s", err) diff --git a/server/signal.go b/server/signal.go index 84022149..d5329c77 100644 --- a/server/signal.go +++ b/server/signal.go @@ -40,11 +40,9 @@ func (s *Server) handleSignals() { } c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGHUP) + signal.Notify(c, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGHUP) - s.grWG.Add(1) go func() { - defer s.grWG.Done() for { select { case sig := <-c: @@ -56,6 +54,8 @@ func (s *Server) handleSignals() { case syscall.SIGUSR1: // File log re-open for rotating file logs. s.ReOpenLogFile() + case syscall.SIGUSR2: + s.lameDuckMode() case syscall.SIGHUP: // Config reload. if err := s.Reload(); err != nil { @@ -111,6 +111,8 @@ func ProcessSignal(command Command, pidStr string) error { err = kill(pid, syscall.SIGUSR1) case CommandReload: err = kill(pid, syscall.SIGHUP) + case commandLDMode: + err = kill(pid, syscall.SIGUSR2) default: err = fmt.Errorf("unknown signal %q", command) } diff --git a/server/signal_test.go b/server/signal_test.go index 3fa27559..de5e007d 100644 --- a/server/signal_test.go +++ b/server/signal_test.go @@ -312,3 +312,29 @@ func TestProcessSignalReloadProcess(t *testing.T) { t.Fatal("Expected kill to be called") } } + +func TestProcessSignalLameDuckMode(t *testing.T) { + killBefore := kill + called := false + kill = func(pid int, signal syscall.Signal) error { + called = true + if pid != 123 { + t.Fatalf("pid is incorrect.\nexpected: 123\ngot: %d", pid) + } + if signal != syscall.SIGUSR2 { + t.Fatalf("signal is incorrect.\nexpected: sigusr2\ngot: %v", signal) + } + return nil + } + defer func() { + kill = killBefore + }() + + if err := ProcessSignal(commandLDMode, "123"); err != nil { + t.Fatalf("ProcessSignal failed: %v", err) + } + + if !called { + t.Fatal("Expected kill to be called") + } +} diff --git a/server/signal_windows.go b/server/signal_windows.go index 368077dd..459a9ed4 100644 --- a/server/signal_windows.go +++ b/server/signal_windows.go @@ -76,6 +76,9 @@ func ProcessSignal(command Command, service string) error { case CommandReload: cmd = svc.ParamChange to = svc.Running + case commandLDMode: + cmd = ldmCmd + to = svc.Running default: return fmt.Errorf("unknown signal %q", command) }