Merge pull request #780 from nats-io/lame_duck_mode

Added support for lame duck mode
This commit is contained in:
Ivan Kozlovic
2018-10-23 14:55:32 -06:00
committed by GitHub
13 changed files with 341 additions and 33 deletions

View File

@@ -2103,8 +2103,8 @@ func (c *client) closeConnection(reason ClosedState) {
c.sl.RemoveBatch(subs)
if srv != nil {
// This is a route that disconnected...
if len(connectURLs) > 0 {
// This is a route that disconnected, but we are not in lame duck mode...
if len(connectURLs) > 0 && !srv.isLameDuckMode() {
// Unless disabled, possibly update the server's INFO protocol
// and send to clients that know how to handle async INFOs.
if !srv.getOpts().Cluster.NoAdvertise {

View File

@@ -1060,6 +1060,15 @@ func TestConfigCheck(t *testing.T) {
errorLine: 3,
errorPos: 5,
},
{
name: "invalid lame_duck_duration type",
config: `
lame_duck_duration: abc
`,
err: errors.New(`error parsing lame_duck_duration: time: invalid duration abc`),
errorLine: 2,
errorPos: 3,
},
}
checkConfig := func(config string) error {

View File

@@ -44,3 +44,5 @@ ping_max: 3
# how long server can block on a socket write to a client
write_deadline: "3s"
lame_duck_duration: "4s"

View File

@@ -26,6 +26,9 @@ const (
CommandQuit = Command("quit")
CommandReopen = Command("reopen")
CommandReload = Command("reload")
// private for now
commandLDMode = Command("ldm")
)
var (
@@ -127,4 +130,8 @@ const (
// DEFAULT_TTL_AE_RESPONSE_MAP
DEFAULT_TTL_AE_RESPONSE_MAP = 10 * time.Minute
// DEFAULT_LAME_DUCK_DURATION is the time in which the server spreads
// the closing of clients when signaled to go in lame duck mode.
DEFAULT_LAME_DUCK_DURATION = 30 * time.Second
)

View File

@@ -395,8 +395,8 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time) {
func (c *client) getRTT() string {
if c.rtt == 0 {
// If a real client, go ahead and send ping now to get a value
// for RTT. For tests and telnet, etc skip.
if c.flags.isSet(connectReceived) && c.opts.Lang != "" {
// for RTT. For tests and telnet, or if client is closing, etc skip.
if !c.flags.isSet(clearConnection) && c.flags.isSet(connectReceived) && c.opts.Lang != "" {
c.sendPing()
}
return ""

View File

@@ -95,6 +95,7 @@ type Options struct {
WriteDeadline time.Duration `json:"-"`
RQSubsSweep time.Duration `json:"-"`
MaxClosedClients int `json:"-"`
LameDuckDuration time.Duration `json:"-"`
CustomClientAuthentication Authentication `json:"-"`
CustomRouterAuthentication Authentication `json:"-"`
@@ -409,6 +410,14 @@ func (o *Options) ProcessConfigFile(configFile string) error {
}
warnings = append(warnings, err)
}
case "lame_duck_duration":
dur, err := time.ParseDuration(v.(string))
if err != nil {
err := &configErr{tk, fmt.Sprintf("error parsing lame_duck_duration: %v", err)}
errors = append(errors, err)
continue
}
o.LameDuckDuration = dur
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
@@ -1782,6 +1791,9 @@ func processOptions(opts *Options) {
if opts.MaxClosedClients == 0 {
opts.MaxClosedClients = DEFAULT_MAX_CLOSED_CLIENTS
}
if opts.LameDuckDuration == 0 {
opts.LameDuckDuration = DEFAULT_LAME_DUCK_DURATION
}
}
// ConfigureOptions accepts a flag set and augment it with NATS Server

View File

@@ -43,6 +43,7 @@ func TestDefaultOptions(t *testing.T) {
WriteDeadline: DEFAULT_FLUSH_DEADLINE,
RQSubsSweep: DEFAULT_REMOTE_QSUBS_SWEEPER,
MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS,
LameDuckDuration: DEFAULT_LAME_DUCK_DURATION,
}
opts := &Options{}
@@ -66,28 +67,29 @@ func TestOptions_RandomPort(t *testing.T) {
func TestConfigFile(t *testing.T) {
golden := &Options{
ConfigFile: "./configs/test.conf",
Host: "127.0.0.1",
Port: 4242,
Username: "derek",
Password: "porkchop",
AuthTimeout: 1.0,
Debug: false,
Trace: true,
Logtime: false,
HTTPPort: 8222,
PidFile: "/tmp/gnatsd.pid",
ProfPort: 6543,
Syslog: true,
RemoteSyslog: "udp://foo.com:33",
MaxControlLine: 2048,
MaxPayload: 65536,
MaxConn: 100,
MaxSubs: 1000,
MaxPending: 10000000,
PingInterval: 60 * time.Second,
MaxPingsOut: 3,
WriteDeadline: 3 * time.Second,
ConfigFile: "./configs/test.conf",
Host: "127.0.0.1",
Port: 4242,
Username: "derek",
Password: "porkchop",
AuthTimeout: 1.0,
Debug: false,
Trace: true,
Logtime: false,
HTTPPort: 8222,
PidFile: "/tmp/gnatsd.pid",
ProfPort: 6543,
Syslog: true,
RemoteSyslog: "udp://foo.com:33",
MaxControlLine: 2048,
MaxPayload: 65536,
MaxConn: 100,
MaxSubs: 1000,
MaxPending: 10000000,
PingInterval: 60 * time.Second,
MaxPingsOut: 3,
WriteDeadline: 3 * time.Second,
LameDuckDuration: 4 * time.Second,
}
opts, err := ProcessConfigFile("./configs/test.conf")
@@ -248,7 +250,8 @@ func TestMergeOverrides(t *testing.T) {
NoAdvertise: true,
ConnectRetries: 2,
},
WriteDeadline: 3 * time.Second,
WriteDeadline: 3 * time.Second,
LameDuckDuration: 4 * time.Second,
}
fopts, err := ProcessConfigFile("./configs/test.conf")
if err != nil {

View File

@@ -136,6 +136,10 @@ type Server struct {
// not set any timeout.
monitoringServer *http.Server
profilingServer *http.Server
// LameDuck mode
ldm bool
ldmCh chan bool
}
// Make sure all are 64bits for atomic use
@@ -434,6 +438,7 @@ func (s *Server) Shutdown() {
s.mu.Unlock()
return
}
s.Noticef("Server Exiting..")
opts := s.getOpts()
@@ -594,6 +599,13 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
for s.isRunning() {
conn, err := l.Accept()
if err != nil {
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
s.Errorf("Temporary Client Accept Error (%v), sleeping %dms",
ne, tmpDelay/time.Millisecond)
@@ -613,7 +625,6 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
s.grWG.Done()
})
}
s.Noticef("Server Exiting..")
s.done <- true
}
@@ -887,8 +898,8 @@ func (s *Server) createClient(conn net.Conn) *client {
// If server is not running, Shutdown() may have already gathered the
// list of connections to close. It won't contain this one, so we need
// to bail out now otherwise the readLoop started down there would not
// be interrupted.
if !s.running {
// be interrupted. Skip also if in lame duck mode.
if !s.running || s.ldm {
s.mu.Unlock()
return c
}
@@ -1540,3 +1551,82 @@ func (s *Server) serviceListeners() []net.Listener {
}
return listeners
}
// Returns true if in lame duck mode.
func (s *Server) isLameDuckMode() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.ldm
}
// This function will close the client listener then close the clients
// at some interval to avoid a reconnecting storm.
func (s *Server) lameDuckMode() {
s.mu.Lock()
// Check if there is actually anything to do
if s.shutdown || s.ldm || s.listener == nil {
s.mu.Unlock()
return
}
s.Noticef("Entering lame duck mode, stop accepting new clients")
s.ldm = true
s.ldmCh = make(chan bool, 1)
s.listener.Close()
s.listener = nil
s.mu.Unlock()
// Wait for accept loop to be done to make sure that no new
// client can connect
<-s.ldmCh
s.mu.Lock()
// Need to recheck few things
if s.shutdown || len(s.clients) == 0 {
s.mu.Unlock()
// If there is no client, we need to call Shutdown() to complete
// the LDMode. If server has been shutdown while lock was released,
// calling Shutdown() should be no-op.
s.Shutdown()
return
}
dur := int64(s.getOpts().LameDuckDuration)
numClients := int64(len(s.clients))
batch := 1
// Sleep interval between each client connection close.
si := dur / numClients
if si < 1 {
// Should not happen (except in test with very small LD duration), but
// if there are too many clients, batch the number of close and
// use a tiny sleep interval that will result in yield likely.
si = 1
batch = int(numClients / dur)
}
// Now capture all clients
clients := make([]*client, 0, len(s.clients))
for _, client := range s.clients {
clients = append(clients, client)
}
s.mu.Unlock()
t := time.NewTimer(10 * time.Second)
s.Noticef("Closing existing clients")
for i, client := range clients {
client.closeConnection(ServerShutdown)
if batch == 1 || i%batch == 0 {
// We pick a random interval which will be at least si/2
v := rand.Int63n(si)
if v < si/2 {
v = si / 2
}
t.Reset(time.Duration(v))
// Sleep for given interval or bail out if kicked by Shutdown().
select {
case <-t.C:
case <-s.quitCh:
t.Stop()
return
}
}
}
s.Shutdown()
}

View File

@@ -698,3 +698,153 @@ func TestProfilingNoTimeout(t *testing.T) {
t.Fatalf("WriteTimeout should not be set, was set to %v", srv.WriteTimeout)
}
}
func TestLameDuckMode(t *testing.T) {
optsA := DefaultOptions()
optsA.Cluster.Host = "127.0.0.1"
srvA := RunServer(optsA)
defer srvA.Shutdown()
// Check that if there is no client, server is shutdown
srvA.lameDuckMode()
srvA.mu.Lock()
shutdown := srvA.shutdown
srvA.mu.Unlock()
if !shutdown {
t.Fatalf("Server should have shutdown")
}
optsA.LameDuckDuration = 10 * time.Nanosecond
srvA = RunServer(optsA)
defer srvA.Shutdown()
optsB := DefaultOptions()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB := RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
total := 50
connectClients := func() []*nats.Conn {
ncs := make([]*nats.Conn, 0, total)
for i := 0; i < total; i++ {
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port),
nats.ReconnectWait(50*time.Millisecond))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
ncs = append(ncs, nc)
}
return ncs
}
stopClientsAndSrvB := func(ncs []*nats.Conn) {
for _, nc := range ncs {
nc.Close()
}
srvB.Shutdown()
}
ncs := connectClients()
checkClientsCount(t, srvA, total)
checkClientsCount(t, srvB, 0)
start := time.Now()
srvA.lameDuckMode()
// Make sure that nothing bad happens if called twice
srvA.lameDuckMode()
// Wait that shutdown completes
elapsed := time.Since(start)
// It should have taken more than the allotted time of 10ms since we had 50 clients.
if elapsed <= optsA.LameDuckDuration {
t.Fatalf("Expected to take more than %v, got %v", optsA.LameDuckDuration, elapsed)
}
checkClientsCount(t, srvA, 0)
checkClientsCount(t, srvB, total)
// Check closed status on server A
cz := pollConz(t, srvA, 1, "", &ConnzOptions{State: ConnClosed})
if n := len(cz.Conns); n != total {
t.Fatalf("Expected %v closed connections, got %v", total, n)
}
for _, c := range cz.Conns {
checkReason(t, c.Reason, ServerShutdown)
}
stopClientsAndSrvB(ncs)
optsA.LameDuckDuration = time.Second
srvA = RunServer(optsA)
defer srvA.Shutdown()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB = RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
ncs = connectClients()
checkClientsCount(t, srvA, total)
checkClientsCount(t, srvB, 0)
start = time.Now()
go srvA.lameDuckMode()
// Check that while in lameDuckMode, it is not possible to connect
// to the server. Wait to be in LD mode first
checkFor(t, 500*time.Millisecond, 15*time.Millisecond, func() error {
srvA.mu.Lock()
ldm := srvA.ldm
srvA.mu.Unlock()
if !ldm {
return fmt.Errorf("Did not reach lame duck mode")
}
return nil
})
if _, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)); err != nats.ErrNoServers {
t.Fatalf("Expected %v, got %v", nats.ErrNoServers, err)
}
srvA.grWG.Wait()
elapsed = time.Since(start)
checkClientsCount(t, srvA, 0)
checkClientsCount(t, srvB, total)
if elapsed > optsA.LameDuckDuration {
t.Fatalf("Expected to not take more than %v, got %v", optsA.LameDuckDuration, elapsed)
}
stopClientsAndSrvB(ncs)
// Now check that we can shutdown server while in LD mode.
optsA.LameDuckDuration = 60 * time.Second
srvA = RunServer(optsA)
defer srvA.Shutdown()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port))
srvB = RunServer(optsB)
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
ncs = connectClients()
checkClientsCount(t, srvA, total)
checkClientsCount(t, srvB, 0)
start = time.Now()
go srvA.lameDuckMode()
time.Sleep(100 * time.Millisecond)
srvA.Shutdown()
elapsed = time.Since(start)
// Make sure that it did not take that long
if elapsed > time.Second {
t.Fatalf("Took too long: %v", elapsed)
}
checkClientsCount(t, srvA, 0)
checkClientsCount(t, srvB, total)
stopClientsAndSrvB(ncs)
}

View File

@@ -24,6 +24,8 @@ import (
const (
reopenLogCode = 128
reopenLogCmd = svc.Cmd(reopenLogCode)
ldmCode = 129
ldmCmd = svc.Cmd(ldmCode)
acceptReopenLog = svc.Accepted(reopenLogCode)
)
@@ -87,6 +89,8 @@ loop:
case reopenLogCmd:
// File log re-open for rotating file logs.
w.server.ReOpenLogFile()
case ldmCmd:
go w.server.lameDuckMode()
case svc.ParamChange:
if err := w.server.Reload(); err != nil {
w.server.Errorf("Failed to reload server configuration: %s", err)

View File

@@ -40,11 +40,9 @@ func (s *Server) handleSignals() {
}
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGHUP)
signal.Notify(c, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGHUP)
s.grWG.Add(1)
go func() {
defer s.grWG.Done()
for {
select {
case sig := <-c:
@@ -56,6 +54,8 @@ func (s *Server) handleSignals() {
case syscall.SIGUSR1:
// File log re-open for rotating file logs.
s.ReOpenLogFile()
case syscall.SIGUSR2:
go s.lameDuckMode()
case syscall.SIGHUP:
// Config reload.
if err := s.Reload(); err != nil {
@@ -111,6 +111,8 @@ func ProcessSignal(command Command, pidStr string) error {
err = kill(pid, syscall.SIGUSR1)
case CommandReload:
err = kill(pid, syscall.SIGHUP)
case commandLDMode:
err = kill(pid, syscall.SIGUSR2)
default:
err = fmt.Errorf("unknown signal %q", command)
}

View File

@@ -312,3 +312,29 @@ func TestProcessSignalReloadProcess(t *testing.T) {
t.Fatal("Expected kill to be called")
}
}
func TestProcessSignalLameDuckMode(t *testing.T) {
killBefore := kill
called := false
kill = func(pid int, signal syscall.Signal) error {
called = true
if pid != 123 {
t.Fatalf("pid is incorrect.\nexpected: 123\ngot: %d", pid)
}
if signal != syscall.SIGUSR2 {
t.Fatalf("signal is incorrect.\nexpected: sigusr2\ngot: %v", signal)
}
return nil
}
defer func() {
kill = killBefore
}()
if err := ProcessSignal(commandLDMode, "123"); err != nil {
t.Fatalf("ProcessSignal failed: %v", err)
}
if !called {
t.Fatal("Expected kill to be called")
}
}

View File

@@ -76,6 +76,9 @@ func ProcessSignal(command Command, service string) error {
case CommandReload:
cmd = svc.ParamChange
to = svc.Running
case commandLDMode:
cmd = ldmCmd
to = svc.Running
default:
return fmt.Errorf("unknown signal %q", command)
}