Merge pull request #523 from nats-io/logtime_reload

Round out config reload
This commit is contained in:
Tyler Treat
2017-06-28 14:35:37 -05:00
committed by GitHub
7 changed files with 400 additions and 23 deletions

View File

@@ -87,6 +87,7 @@ func (cf *clientFlag) clear(c clientFlag) {
type client struct {
// Here first because of use of atomics, and memory alignment.
stats
mpay int64
mu sync.Mutex
typ int
cid uint64
@@ -94,7 +95,6 @@ type client struct {
opts clientOpts
start time.Time
nc net.Conn
mpay int
ncs string
bw *bufio.Writer
srv *Server
@@ -524,8 +524,8 @@ func (c *client) maxConnExceeded() {
c.closeConnection()
}
func (c *client) maxPayloadViolation(sz int) {
c.Errorf("%s: %d vs %d", ErrMaxPayload.Error(), sz, c.mpay)
func (c *client) maxPayloadViolation(sz int, max int64) {
c.Errorf("%s: %d vs %d", ErrMaxPayload.Error(), sz, max)
c.sendErr("Maximum Payload Violation")
c.closeConnection()
}
@@ -712,8 +712,9 @@ func (c *client) processPub(arg []byte) error {
if c.pa.size < 0 {
return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg)
}
if c.mpay > 0 && c.pa.size > c.mpay {
c.maxPayloadViolation(c.pa.size)
maxPayload := atomic.LoadInt64(&c.mpay)
if maxPayload > 0 && int64(c.pa.size) > maxPayload {
c.maxPayloadViolation(c.pa.size, maxPayload)
return ErrMaxPayload
}

View File

@@ -0,0 +1,6 @@
# Copyright 2017 Apcera Inc. All rights reserved.
listen: localhost:-1
log_file: "/tmp/gnatsd.log"
max_connections: 1

View File

@@ -0,0 +1,6 @@
# Copyright 2017 Apcera Inc. All rights reserved.
listen: localhost:-1
log_file: "/tmp/gnatsd.log"
max_payload: 1

View File

@@ -1,10 +1,19 @@
# Copyright 2017 Apcera Inc. All rights reserved.
# logging options
debug: true # enable on reload
trace: true # enable on reload
logtime: false
log_file: "/tmp/gnatsd.log"
debug: true # enable on reload
trace: true # enable on reload
logtime: true # enable on reload
syslog: true # enable on reload
remote_syslog: "udp://localhost:514" # change on reload
log_file: "/tmp/gnatsd-2.log" # change on reload
pid_file: "/tmp/gnatsd.pid" # change on reload
max_control_line: 512 # change on reload
ping_interval: 5 # change on reload
ping_max: 1 # change on reload
write_deadline: "2s" # change on reload
max_payload: 1024 # change on reload
# Enable TLS on reload
tls {

View File

@@ -9,6 +9,8 @@ import (
"net/url"
"reflect"
"strings"
"sync/atomic"
"time"
)
// FlagSnapshot captures the server options as specified by CLI flags at
@@ -45,8 +47,7 @@ type traceOption struct {
newValue bool
}
// Apply is a no-op because authorization will be reloaded after options are
// applied
// Apply is a no-op because logging will be reloaded after options are applied.
func (t *traceOption) Apply(server *Server) {
server.Noticef("Reloaded: trace = %v", t.newValue)
}
@@ -57,12 +58,56 @@ type debugOption struct {
newValue bool
}
// Apply is a no-op because authorization will be reloaded after options are
// applied
// Apply is a no-op because logging will be reloaded after options are applied.
func (d *debugOption) Apply(server *Server) {
server.Noticef("Reloaded: debug = %v", d.newValue)
}
// logtimeOption implements the option interface for the `logtime` setting.
type logtimeOption struct {
loggingOption
newValue bool
}
// Apply is a no-op because logging will be reloaded after options are applied.
func (l *logtimeOption) Apply(server *Server) {
server.Noticef("Reloaded: logtime = %v", l.newValue)
}
// logfileOption implements the option interface for the `log_file` setting.
type logfileOption struct {
loggingOption
newValue string
}
// Apply is a no-op because logging will be reloaded after options are applied.
func (l *logfileOption) Apply(server *Server) {
server.Noticef("Reloaded: log_file = %v", l.newValue)
}
// syslogOption implements the option interface for the `syslog` setting.
type syslogOption struct {
loggingOption
newValue bool
}
// Apply is a no-op because logging will be reloaded after options are applied.
func (s *syslogOption) Apply(server *Server) {
server.Noticef("Reloaded: syslog = %v", s.newValue)
}
// remoteSyslogOption implements the option interface for the `remote_syslog`
// setting.
type remoteSyslogOption struct {
loggingOption
newValue string
}
// Apply is a no-op because logging will be reloaded after options are applied.
func (r *remoteSyslogOption) Apply(server *Server) {
server.Noticef("Reloaded: remote_syslog = %v", r.newValue)
}
// noopOption is a base struct that provides default no-op behaviors.
type noopOption struct{}
@@ -236,6 +281,134 @@ func (r *routesOption) Apply(server *Server) {
server.Noticef("Reloaded: cluster routes")
}
// maxConnOption implements the option interface for the `max_connections`
// setting.
type maxConnOption struct {
noopOption
newValue int
}
// Apply the max connections change by closing random connections til we are
// below the limit if necessary.
func (m *maxConnOption) Apply(server *Server) {
server.mu.Lock()
var (
clients = make([]*client, len(server.clients))
i = 0
)
// Map iteration is random, which allows us to close random connections.
for _, client := range server.clients {
clients[i] = client
i++
}
server.mu.Unlock()
if m.newValue > 0 && len(clients) > m.newValue {
// Close connections til we are within the limit.
var (
numClose = len(clients) - m.newValue
closed = 0
)
for _, client := range clients {
client.maxConnExceeded()
closed++
if closed >= numClose {
break
}
}
server.Noticef("Closed %d connections to fall within max_connections", closed)
}
server.Noticef("Reloaded: max_connections = %v", m.newValue)
}
// pidFileOption implements the option interface for the `pid_file` setting.
type pidFileOption struct {
noopOption
newValue string
}
// Apply the setting by logging the pid to the new file.
func (p *pidFileOption) Apply(server *Server) {
if p.newValue == "" {
return
}
if err := server.logPid(); err != nil {
server.Errorf("Failed to write pidfile: %v", err)
}
server.Noticef("Reloaded: pid_file = %v", p.newValue)
}
// maxControlLineOption implements the option interface for the
// `max_control_line` setting.
type maxControlLineOption struct {
noopOption
newValue int
}
// Apply is a no-op because the max control line will be reloaded after options
// are applied
func (m *maxControlLineOption) Apply(server *Server) {
server.Noticef("Reloaded: max_control_line = %d", m.newValue)
}
// maxPayloadOption implements the option interface for the `max_payload`
// setting.
type maxPayloadOption struct {
noopOption
newValue int
}
// Apply the setting by updating the server info and each client.
func (m *maxPayloadOption) Apply(server *Server) {
server.mu.Lock()
server.info.MaxPayload = m.newValue
server.generateServerInfoJSON()
for _, client := range server.clients {
atomic.StoreInt64(&client.mpay, int64(m.newValue))
}
server.mu.Unlock()
server.Noticef("Reloaded: max_payload = %d", m.newValue)
}
// pingIntervalOption implements the option interface for the `ping_interval`
// setting.
type pingIntervalOption struct {
noopOption
newValue time.Duration
}
// Apply is a no-op because the ping interval will be reloaded after options
// are applied.
func (p *pingIntervalOption) Apply(server *Server) {
server.Noticef("Reloaded: ping_interval = %s", p.newValue)
}
// maxPingsOutOption implements the option interface for the `ping_max`
// setting.
type maxPingsOutOption struct {
noopOption
newValue int
}
// Apply is a no-op because the ping interval will be reloaded after options
// are applied.
func (m *maxPingsOutOption) Apply(server *Server) {
server.Noticef("Reloaded: ping_max = %d", m.newValue)
}
// writeDeadlineOption implements the option interface for the `write_deadline`
// setting.
type writeDeadlineOption struct {
noopOption
newValue time.Duration
}
// Apply is a no-op because the write deadline will be reloaded after options
// are applied.
func (w *writeDeadlineOption) Apply(server *Server) {
server.Noticef("Reloaded: write_deadline = %s", w.newValue)
}
// Reload reads the current configuration file and applies any supported
// changes. This returns an error if the server was not started with a config
// file or an option which doesn't support hot-swapping was changed.
@@ -296,6 +469,14 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &traceOption{newValue: newValue.(bool)})
case "debug":
diffOpts = append(diffOpts, &debugOption{newValue: newValue.(bool)})
case "logtime":
diffOpts = append(diffOpts, &logtimeOption{newValue: newValue.(bool)})
case "logfile":
diffOpts = append(diffOpts, &logfileOption{newValue: newValue.(string)})
case "syslog":
diffOpts = append(diffOpts, &syslogOption{newValue: newValue.(bool)})
case "remotesyslog":
diffOpts = append(diffOpts, &remoteSyslogOption{newValue: newValue.(string)})
case "tlsconfig":
diffOpts = append(diffOpts, &tlsOption{newValue: newValue.(*tls.Config)})
case "tlstimeout":
@@ -319,6 +500,20 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
case "routes":
add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL))
diffOpts = append(diffOpts, &routesOption{add: add, remove: remove})
case "maxconn":
diffOpts = append(diffOpts, &maxConnOption{newValue: newValue.(int)})
case "pidfile":
diffOpts = append(diffOpts, &pidFileOption{newValue: newValue.(string)})
case "maxcontrolline":
diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int)})
case "maxpayload":
diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int)})
case "pinginterval":
diffOpts = append(diffOpts, &pingIntervalOption{newValue: newValue.(time.Duration)})
case "maxpingsout":
diffOpts = append(diffOpts, &maxPingsOutOption{newValue: newValue.(int)})
case "writedeadline":
diffOpts = append(diffOpts, &writeDeadlineOption{newValue: newValue.(time.Duration)})
case "nolog":
// Ignore NoLog option since it's not parsed and only used in
// testing.

View File

@@ -181,6 +181,18 @@ func TestConfigReload(t *testing.T) {
if !updated.Debug {
t.Fatal("Expected Debug to be true")
}
if !updated.Logtime {
t.Fatal("Expected Logtime to be true")
}
if updated.LogFile != "/tmp/gnatsd-2.log" {
t.Fatalf("LogFile is incorrect.\nexpected: /tmp/gnatsd-2.log\ngot: %s", updated.LogFile)
}
if !updated.Syslog {
t.Fatal("Expected Syslog to be true")
}
if updated.RemoteSyslog != "udp://localhost:514" {
t.Fatalf("RemoteSyslog is incorrect.\nexpected: udp://localhost:514\ngot: %s", updated.RemoteSyslog)
}
if updated.TLSConfig == nil {
t.Fatal("Expected TLSConfig to be non-nil")
}
@@ -205,6 +217,24 @@ func TestConfigReload(t *testing.T) {
if !updated.Cluster.NoAdvertise {
t.Fatal("Expected NoAdvertise to be true")
}
if updated.PidFile != "/tmp/gnatsd.pid" {
t.Fatalf("PidFile is incorrect.\nexpected: /tmp/gnatsd.pid\ngot: %s", updated.PidFile)
}
if updated.MaxControlLine != 512 {
t.Fatalf("MaxControlLine is incorrect.\nexpected: 512\ngot: %d", updated.MaxControlLine)
}
if updated.PingInterval != 5*time.Second {
t.Fatalf("PingInterval is incorrect.\nexpected 5s\ngot: %s", updated.PingInterval)
}
if updated.MaxPingsOut != 1 {
t.Fatalf("MaxPingsOut is incorrect.\nexpected 1\ngot: %d", updated.MaxPingsOut)
}
if updated.WriteDeadline != 2*time.Second {
t.Fatalf("WriteDeadline is incorrect.\nexpected 2s\ngot: %s", updated.WriteDeadline)
}
if updated.MaxPayload != 1024 {
t.Fatalf("MaxPayload is incorrect.\nexpected 1024\ngot: %d", updated.MaxPayload)
}
}
// Ensure Reload supports TLS config changes. Test this by starting a server
@@ -1342,6 +1372,137 @@ func TestConfigReloadClusterRoutes(t *testing.T) {
}
}
// Ensure Reload supports changing the max connections. Test this by starting a
// server with no max connections, connecting two clients, reloading with a
// max connections of one, and ensuring one client is disconnected.
func TestConfigReloadMaxConnections(t *testing.T) {
server, opts, config := runServerWithSymlinkConfig(t, "tmp.conf", "./configs/reload/basic.conf")
defer os.Remove(config)
defer server.Shutdown()
// Make two connections.
addr := fmt.Sprintf("nats://%s:%d", opts.Host, server.Addr().(*net.TCPAddr).Port)
nc1, err := nats.Connect(addr)
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc1.Close()
closed := make(chan struct{}, 1)
nc1.SetDisconnectHandler(func(*nats.Conn) {
closed <- struct{}{}
})
nc2, err := nats.Connect(addr)
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc2.Close()
nc2.SetDisconnectHandler(func(*nats.Conn) {
closed <- struct{}{}
})
if numClients := server.NumClients(); numClients != 2 {
t.Fatalf("Expected 2 clients, got %d", numClients)
}
// Set max connections to one.
if err := os.Remove(config); err != nil {
t.Fatalf("Error deleting symlink: %v", err)
}
if err := os.Symlink("./configs/reload/max_connections.conf", config); err != nil {
t.Fatalf("Error creating symlink: %v (ensure you have privileges)", err)
}
if err := server.Reload(); err != nil {
t.Fatalf("Error reloading config: %v", err)
}
// Ensure one connection was closed.
select {
case <-closed:
case <-time.After(2 * time.Second):
t.Fatal("Expected to be disconnected")
}
if numClients := server.NumClients(); numClients != 1 {
t.Fatalf("Expected 1 client, got %d", numClients)
}
// Ensure new connections fail.
_, err = nats.Connect(addr)
if err == nil {
t.Fatal("Expected error on connect")
}
}
// Ensure reload supports changing the max payload size. Test this by starting
// a server with the default size limit, ensuring publishes work, reloading
// with a restrictive limit, and ensuring publishing an oversized message fails
// and disconnects the client.
func TestConfigReloadMaxPayload(t *testing.T) {
server, opts, config := runServerWithSymlinkConfig(t, "tmp.conf", "./configs/reload/basic.conf")
defer os.Remove(config)
defer server.Shutdown()
addr := fmt.Sprintf("nats://%s:%d", opts.Host, server.Addr().(*net.TCPAddr).Port)
nc, err := nats.Connect(addr)
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc.Close()
closed := make(chan struct{})
nc.SetDisconnectHandler(func(*nats.Conn) {
closed <- struct{}{}
})
conn, err := nats.Connect(addr)
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer conn.Close()
sub, err := conn.SubscribeSync("foo")
if err != nil {
t.Fatalf("Error subscribing: %v", err)
}
conn.Flush()
// Ensure we can publish as a sanity check.
if err := nc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error publishing: %v", err)
}
nc.Flush()
_, err = sub.NextMsg(2 * time.Second)
if err != nil {
t.Fatalf("Error receiving message: %v", err)
}
// Set max payload to one.
if err := os.Remove(config); err != nil {
t.Fatalf("Error deleting symlink: %v", err)
}
if err := os.Symlink("./configs/reload/max_payload.conf", config); err != nil {
t.Fatalf("Error creating symlink: %v (ensure you have privileges)", err)
}
if err := server.Reload(); err != nil {
t.Fatalf("Error reloading config: %v", err)
}
// Ensure oversized messages don't get delivered and the client is
// disconnected.
if err := nc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error publishing: %v", err)
}
nc.Flush()
_, err = sub.NextMsg(20 * time.Millisecond)
if err != nats.ErrTimeout {
t.Fatalf("Expected ErrTimeout, got: %v", err)
}
select {
case <-closed:
case <-time.After(2 * time.Second):
t.Fatal("Expected to be disconnected")
}
}
func runServerWithSymlinkConfig(t *testing.T, symlinkName, configName string) (*Server, *Options, string) {
opts, config := newOptionsWithSymlinkConfig(t, symlinkName, configName)
opts.NoLog = true

View File

@@ -224,12 +224,9 @@ func (s *Server) isRunning() bool {
return s.running
}
func (s *Server) logPid() {
func (s *Server) logPid() error {
pidStr := strconv.Itoa(os.Getpid())
err := ioutil.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660)
if err != nil {
PrintAndDie(fmt.Sprintf("Could not write pidfile: %v\n", err))
}
return ioutil.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660)
}
// Start up the server, this will block.
@@ -252,7 +249,9 @@ func (s *Server) Start() {
// Log the pid to a file
if opts.PidFile != _EMPTY_ {
s.logPid()
if err := s.logPid(); err != nil {
PrintAndDie(fmt.Sprintf("Could not write pidfile: %v\n", err))
}
}
// Start monitoring if needed
@@ -638,7 +637,10 @@ func (s *Server) HTTPHandler() http.Handler {
}
func (s *Server) createClient(conn net.Conn) *client {
c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: s.info.MaxPayload, start: time.Now()}
// Snapshot server options.
opts := s.getOpts()
c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: int64(opts.MaxPayload), start: time.Now()}
// Grab JSON info string
s.mu.Lock()
@@ -673,9 +675,6 @@ func (s *Server) createClient(conn net.Conn) *client {
return c
}
// Snapshot server options.
opts := s.getOpts()
// If there is a max connections specified, check that adding
// this new client would not push us over the max
if opts.MaxConn > 0 && len(s.clients) >= opts.MaxConn {