From f5bd07b36c9c9519fc5a980ccad6aa65aebae8a4 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Sun, 1 Mar 2020 21:28:23 -0500 Subject: [PATCH] [FIXED] trace/debug/sys_log reload will affect existing clients Fixed #1296, by altering client state on reload Detect a trace level change on reload and update all clients. To avoid data races, read client.trace while holding the lock, pass the value into functionis that trace while not holding the lock. Delete unused client.debug. Signed-off-by: Matthias Hanel --- server/client.go | 92 ++++++++++++++++---------- server/events.go | 8 ++- server/gateway.go | 14 ++-- server/leafnode.go | 19 ++++-- server/log.go | 14 +++- server/parser.go | 39 +++++------ server/parser_test.go | 4 +- server/reload.go | 83 ++++++++++++++++++++++- server/reload_test.go | 150 +++++++++++++++++++++++++++++++++++++++++- server/route.go | 28 ++++---- server/server.go | 2 +- 11 files changed, 363 insertions(+), 90 deletions(-) diff --git a/server/client.go b/server/client.go index 27afd28a..24a58746 100644 --- a/server/client.go +++ b/server/client.go @@ -229,7 +229,6 @@ type client struct { flags clientFlag // Compact booleans into a single field. Size will be increased when needed. - debug bool trace bool echo bool } @@ -434,6 +433,14 @@ func init() { rand.Seed(time.Now().UnixNano()) } +func (c *client) setTraceLevel() { + if c.kind == SYSTEM && !(atomic.LoadInt32(&c.srv.logging.traceSysAcc) != 0) { + c.trace = false + } else { + c.trace = (atomic.LoadInt32(&c.srv.logging.trace) != 0) + } +} + // Lock should be held func (c *client) initClient() { s := c.srv @@ -450,12 +457,7 @@ func (c *client) initClient() { c.subs = make(map[string]*subscription) c.echo = true - c.debug = (atomic.LoadInt32(&c.srv.logging.debug) != 0) - c.trace = (atomic.LoadInt32(&c.srv.logging.trace) != 0) - - if c.kind == SYSTEM && !c.srv.logging.traceSysAcc { - c.trace = false - } + c.setTraceLevel() // This is a scratch buffer used for processMsg() // The msg header starts with "RMSG ", which can be used @@ -1220,11 +1222,9 @@ func (c *client) flushSignal() bool { return false } +// Traces a message. +// Will NOT check if tracing is enabled, does NOT need the client lock. func (c *client) traceMsg(msg []byte) { - if !c.trace { - return - } - maxTrace := c.srv.getOpts().MaxTracedMsgLen if maxTrace > 0 && (len(msg)-LEN_CR_LF) > maxTrace { c.Tracef("<<- MSG_PAYLOAD: [\"%s...\"]", msg[:maxTrace]) @@ -1233,19 +1233,27 @@ func (c *client) traceMsg(msg []byte) { } } +// Traces an incoming operation. +// Will NOT check if tracing is enabled, does NOT need the client lock. func (c *client) traceInOp(op string, arg []byte) { c.traceOp("<<- %s", op, arg) } +// Traces an outgoing operation. +// Will check if tracing is enabled, DOES need the client lock. +func (c *client) traceOutOpIfOk(op string, arg []byte) { + if c.trace { + c.traceOutOp(op, arg) + } +} + +// Traces an outgoing operation. +// Will NOT check if tracing is enabled, does NOT need the client lock. func (c *client) traceOutOp(op string, arg []byte) { c.traceOp("->> %s", op, arg) } func (c *client) traceOp(format, op string, arg []byte) { - if !c.trace { - return - } - opa := []interface{}{} if op != "" { opa = append(opa, op) @@ -1331,8 +1339,8 @@ func computeRTT(start time.Time) time.Duration { return rtt } -func (c *client) processConnect(arg []byte) error { - if c.trace { +func (c *client) processConnect(arg []byte, trace bool) error { + if trace { c.traceInOp("CONNECT", removePassFromTrace(arg)) } @@ -1655,7 +1663,7 @@ func (c *client) enqueueProto(proto []byte) { // Assume the lock is held upon entry. func (c *client) sendPong() { - c.traceOutOp("PONG", nil) + c.traceOutOpIfOk("PONG", nil) c.enqueueProto([]byte(pongProto)) } @@ -1689,7 +1697,7 @@ func (c *client) sendRTTPingLocked() bool { func (c *client) sendPing() { c.rttStart = time.Now() c.ping.out++ - c.traceOutOp("PING", nil) + c.traceOutOpIfOk("PING", nil) c.enqueueProto([]byte(pingProto)) } @@ -1708,14 +1716,14 @@ func (c *client) generateClientInfoJSON(info Info) []byte { func (c *client) sendErr(err string) { c.mu.Lock() - c.traceOutOp("-ERR", []byte(err)) + c.traceOutOpIfOk("-ERR", []byte(err)) c.enqueueProto([]byte(fmt.Sprintf(errProto, err))) c.mu.Unlock() } func (c *client) sendOK() { c.mu.Lock() - c.traceOutOp("OK", nil) + c.traceOutOpIfOk("OK", nil) c.enqueueProto([]byte(okProto)) c.pcd[c] = needFlush c.mu.Unlock() @@ -1723,7 +1731,10 @@ func (c *client) sendOK() { func (c *client) processPing() { c.mu.Lock() - c.traceInOp("PING", nil) + if c.trace { + c.traceInOp("PING", nil) + } + if c.isClosed() { c.mu.Unlock() return @@ -1781,8 +1792,10 @@ func (c *client) processPing() { } func (c *client) processPong() { - c.traceInOp("PONG", nil) c.mu.Lock() + if c.trace { + c.traceInOp("PONG", nil) + } c.ping.out = 0 c.rtt = computeRTT(c.rttStart) srv := c.srv @@ -1793,7 +1806,7 @@ func (c *client) processPong() { } } -func (c *client) processPub(trace bool, arg []byte) error { +func (c *client) processPub(arg []byte, trace bool) error { if trace { c.traceInOp("PUB", arg) } @@ -1874,8 +1887,10 @@ func splitArg(arg []byte) [][]byte { return args } -func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) { - c.traceInOp("SUB", argo) +func (c *client) processSub(argo []byte, noForward bool, trace bool) (*subscription, error) { + if trace { + c.traceInOp("SUB", argo) + } // Indicate activity. c.in.subs++ @@ -2213,7 +2228,10 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool c.mu.Unlock() return } - c.traceOp("<-> %s", "DELSUB", sub.sid) + + if c.trace { + c.traceOp("<-> %s", "DELSUB", sub.sid) + } if c.kind != CLIENT && c.kind != SYSTEM { c.removeReplySubTimeout(sub) @@ -2255,8 +2273,10 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool } } -func (c *client) processUnsub(arg []byte) error { - c.traceInOp("UNSUB", arg) +func (c *client) processUnsub(arg []byte, trace bool) error { + if trace { + c.traceInOp("UNSUB", arg) + } args := splitArg(arg) var sid []byte max := -1 @@ -2710,27 +2730,27 @@ func isReservedReply(reply []byte) bool { } // This will decide to call the client code or router code. -func (c *client) processInboundMsg(msg []byte) { +func (c *client) processInboundMsg(msg []byte, trace bool) { switch c.kind { case CLIENT: - c.processInboundClientMsg(msg) + c.processInboundClientMsg(msg, trace) case ROUTER: - c.processInboundRoutedMsg(msg) + c.processInboundRoutedMsg(msg, trace) case GATEWAY: - c.processInboundGatewayMsg(msg) + c.processInboundGatewayMsg(msg, trace) case LEAF: - c.processInboundLeafMsg(msg) + c.processInboundLeafMsg(msg, trace) } } // processInboundClientMsg is called to process an inbound msg from a client. -func (c *client) processInboundClientMsg(msg []byte) { +func (c *client) processInboundClientMsg(msg []byte, trace bool) { // Update statistics // The msg includes the CR_LF, so pull back out for accounting. c.in.msgs++ c.in.bytes += int32(len(msg) - LEN_CR_LF) - if c.trace { + if trace { c.traceMsg(msg) } diff --git a/server/events.go b/server/events.go index e3c60354..972f2d64 100644 --- a/server/events.go +++ b/server/events.go @@ -265,16 +265,17 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) { c.pa.size = len(b) c.pa.szb = []byte(strconv.FormatInt(int64(len(b)), 10)) c.pa.reply = []byte(pm.rply) + trace := c.trace c.mu.Unlock() - if c.trace { + if trace { c.traceInOp(fmt.Sprintf( "PUB %s %s %d", c.pa.subject, c.pa.reply, c.pa.size), nil) } // Add in NL b = append(b, _CRLF_...) - c.processInboundClientMsg(b) + c.processInboundClientMsg(b, trace) // See if we are doing graceful shutdown. if !pm.last { c.flushClients(0) // Never spend time in place. @@ -1020,10 +1021,11 @@ func (s *Server) systemSubscribe(subject string, internalOnly bool, cb msgHandle s.sys.subs[sid] = cb s.sys.sid++ c := s.sys.client + trace := c.trace s.mu.Unlock() // Now create the subscription - return c.processSub([]byte(subject+" "+sid), internalOnly) + return c.processSub([]byte(subject+" "+sid), internalOnly, trace) } func (s *Server) sysUnsubscribe(sub *subscription) { diff --git a/server/gateway.go b/server/gateway.go index 3b9bb064..44c64ba7 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -1722,8 +1722,8 @@ func (c *client) processGatewayAccountSub(accName string) error { // If in modeInterestOnly or for a queue sub, remove from // the sublist if present. // -func (c *client) processGatewayRUnsub(arg []byte) error { - accName, subject, queue, err := c.parseUnsubProto(arg) +func (c *client) processGatewayRUnsub(arg []byte, trace bool) error { + accName, subject, queue, err := c.parseUnsubProto(trace, arg) if err != nil { return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error()) } @@ -1813,8 +1813,10 @@ func (c *client) processGatewayRUnsub(arg []byte) error { // For queue subs, or if in modeInterestOnly, register interest // from remote gateway. // -func (c *client) processGatewayRSub(arg []byte) error { - c.traceInOp("RS+", arg) +func (c *client) processGatewayRSub(arg []byte, trace bool) error { + if trace { + c.traceInOp("RS+", arg) + } // Indicate activity. c.in.subs++ @@ -2725,13 +2727,13 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) { // account or subject for which there is no interest in this cluster // an A-/RS- protocol may be send back. // -func (c *client) processInboundGatewayMsg(msg []byte) { +func (c *client) processInboundGatewayMsg(msg []byte, trace bool) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += int32(len(msg) - LEN_CR_LF) - if c.trace { + if trace { c.traceMsg(msg) } diff --git a/server/leafnode.go b/server/leafnode.go index ccef467d..926bfeb7 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1193,6 +1193,7 @@ func (c *client) sendAllLeafSubs() { c.mu.Unlock() } +// Lock should be held. func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) { if key == "" { return @@ -1226,8 +1227,10 @@ func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) { } // processLeafSub will process an inbound sub request for the remote leaf node. -func (c *client) processLeafSub(argo []byte) (err error) { - c.traceInOp("LS+", argo) +func (c *client) processLeafSub(argo []byte, trace bool) (err error) { + if trace { + c.traceInOp("LS+", argo) + } // Indicate activity. c.in.subs++ @@ -1352,8 +1355,10 @@ func (s *Server) reportLeafNodeLoop(c *client) { } // processLeafUnsub will process an inbound unsub request for the remote leaf node. -func (c *client) processLeafUnsub(arg []byte) error { - c.traceInOp("LS-", arg) +func (c *client) processLeafUnsub(arg []byte, trace bool) error { + if trace { + c.traceInOp("LS-", arg) + } // Indicate any activity, so pub and sub or unsubs. c.in.subs++ @@ -1389,7 +1394,7 @@ func (c *client) processLeafUnsub(arg []byte) error { return nil } -func (c *client) processLeafMsgArgs(trace bool, arg []byte) error { +func (c *client) processLeafMsgArgs(arg []byte, trace bool) error { if trace { c.traceInOp("LMSG", arg) } @@ -1464,13 +1469,13 @@ func (c *client) processLeafMsgArgs(trace bool, arg []byte) error { } // processInboundLeafMsg is called to process an inbound msg from a leaf node. -func (c *client) processInboundLeafMsg(msg []byte) { +func (c *client) processInboundLeafMsg(msg []byte, trace bool) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += int32(len(msg) - LEN_CR_LF) - if c.trace { + if trace { c.traceMsg(msg) } diff --git a/server/log.go b/server/log.go index 0787d06c..862088c8 100644 --- a/server/log.go +++ b/server/log.go @@ -85,13 +85,16 @@ func (s *Server) ConfigureLogger() { log = srvlog.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, colors, true) } - s.SetLogger(log, opts.Debug, opts.Trace) - - s.logging.traceSysAcc = opts.TraceVerbose + s.SetLoggerV2(log, opts.Debug, opts.Trace, opts.TraceVerbose) } // SetLogger sets the logger of the server func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool) { + s.SetLoggerV2(logger, debugFlag, traceFlag, false) +} + +// SetLogger sets the logger of the server +func (s *Server) SetLoggerV2(logger Logger, debugFlag, traceFlag, sysTrace bool) { if debugFlag { atomic.StoreInt32(&s.logging.debug, 1) } else { @@ -102,6 +105,11 @@ func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool) { } else { atomic.StoreInt32(&s.logging.trace, 0) } + if sysTrace { + atomic.StoreInt32(&s.logging.traceSysAcc, 1) + } else { + atomic.StoreInt32(&s.logging.traceSysAcc, 0) + } s.logging.Lock() if s.logging.logger != nil { // Check to see if the logger implements io.Closer. This could be a diff --git a/server/parser.go b/server/parser.go index 8d58962b..ba1eca01 100644 --- a/server/parser.go +++ b/server/parser.go @@ -119,6 +119,7 @@ func (c *client) parse(buf []byte) error { authSet := c.awaitingAuth() // Snapshot max control line as well. mcl := c.mcl + trace := c.trace c.mu.Unlock() // Move to loop instead of range syntax to allow jumping of i @@ -220,7 +221,7 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processPub(c.trace, arg); err != nil { + if err := c.processPub(arg, trace); err != nil { return err } c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD @@ -277,7 +278,7 @@ func (c *client) parse(buf []byte) error { } else { c.msgBuf = buf[c.as : i+1] } - c.processInboundMsg(c.msgBuf) + c.processInboundMsg(c.msgBuf, trace) c.argBuf, c.msgBuf = nil, nil c.drop, c.as, c.state = 0, i+1, OP_START // Drop all pub args @@ -319,7 +320,7 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processAccountSub(arg); err != nil { + if err := c.processAccountSub(arg, trace); err != nil { return err } c.drop, c.as, c.state = 0, i+1, OP_START @@ -407,13 +408,13 @@ func (c *client) parse(buf []byte) error { switch c.kind { case CLIENT: - _, err = c.processSub(arg, false) + _, err = c.processSub(arg, false, trace) case ROUTER: - err = c.processRemoteSub(arg) + err = c.processRemoteSub(arg, trace) case GATEWAY: - err = c.processGatewayRSub(arg) + err = c.processGatewayRSub(arg, trace) case LEAF: - err = c.processLeafSub(arg) + err = c.processLeafSub(arg, trace) } if err != nil { return err @@ -519,13 +520,13 @@ func (c *client) parse(buf []byte) error { switch c.kind { case CLIENT: - err = c.processUnsub(arg) + err = c.processUnsub(arg, trace) case ROUTER: - err = c.processRemoteUnsub(arg) + err = c.processRemoteUnsub(arg, trace) case GATEWAY: - err = c.processGatewayRUnsub(arg) + err = c.processGatewayRUnsub(arg, trace) case LEAF: - err = c.processLeafUnsub(arg) + err = c.processLeafUnsub(arg, trace) } if err != nil { return err @@ -638,7 +639,7 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processConnect(arg); err != nil { + if err := c.processConnect(arg, trace); err != nil { return err } c.drop, c.state = 0, OP_START @@ -694,9 +695,9 @@ func (c *client) parse(buf []byte) error { } var err error if c.kind == ROUTER || c.kind == GATEWAY { - err = c.processRoutedMsgArgs(c.trace, arg) + err = c.processRoutedMsgArgs(arg, trace) } else if c.kind == LEAF { - err = c.processLeafMsgArgs(c.trace, arg) + err = c.processLeafMsgArgs(arg, trace) } if err != nil { return err @@ -869,7 +870,7 @@ func (c *client) parse(buf []byte) error { // read buffer and we are not able to process the msg. if c.argBuf == nil { // Works also for MSG_ARG, when message comes from ROUTE. - if err := c.clonePubArg(); err != nil { + if err := c.clonePubArg(trace); err != nil { goto parseErr } } @@ -921,17 +922,17 @@ func protoSnippet(start int, buf []byte) string { // clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but // we need to hold onto it into the next read. -func (c *client) clonePubArg() error { +func (c *client) clonePubArg(trace bool) error { // Just copy and re-process original arg buffer. c.argBuf = c.scratch[:0] c.argBuf = append(c.argBuf, c.pa.arg...) switch c.kind { case ROUTER, GATEWAY: - return c.processRoutedMsgArgs(false, c.argBuf) + return c.processRoutedMsgArgs(c.argBuf, trace) case LEAF: - return c.processLeafMsgArgs(false, c.argBuf) + return c.processLeafMsgArgs(c.argBuf, trace) default: - return c.processPub(false, c.argBuf) + return c.processPub(c.argBuf, trace) } } diff --git a/server/parser_test.go b/server/parser_test.go index 7f2aece9..1e267157 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -301,7 +301,7 @@ func TestParsePubArg(t *testing.T) { subject: "foo", reply: "", size: 2222, szb: "2222"}, } { t.Run(test.arg, func(t *testing.T) { - if err := c.processPub(false, []byte(test.arg)); err != nil { + if err := c.processPub([]byte(test.arg), false); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } if !bytes.Equal(c.pa.subject, []byte(test.subject)) { @@ -324,7 +324,7 @@ func TestParsePubBadSize(t *testing.T) { c := dummyClient() // Setup localized max payload c.mpay = 32768 - if err := c.processPub(false, []byte("foo 2222222222222222")); err == nil { + if err := c.processPub([]byte("foo 2222222222222222"), false); err == nil { t.Fatalf("Expected parse error for size too large") } } diff --git a/server/reload.go b/server/reload.go index 89553be6..0d5dfac5 100644 --- a/server/reload.go +++ b/server/reload.go @@ -40,6 +40,10 @@ type option interface { // IsLoggingChange indicates if this option requires reloading the logger. IsLoggingChange() bool + // IsTraceLevelChange indicates if this option requires reloading cached trace level. + // Clients store trace level separately. + IsTraceLevelChange() bool + // IsAuthChange indicates if this option requires reloading authorization. IsAuthChange() bool @@ -55,6 +59,10 @@ func (n noopOption) IsLoggingChange() bool { return false } +func (n noopOption) IsTraceLevelChange() bool { + return false +} + func (n noopOption) IsAuthChange() bool { return false } @@ -73,9 +81,19 @@ func (l loggingOption) IsLoggingChange() bool { return true } +// traceLevelOption is a base struct that provides default option behaviors for +// tracelevel-related options. +type traceLevelOption struct { + loggingOption +} + +func (l traceLevelOption) IsTraceLevelChange() bool { + return true +} + // traceOption implements the option interface for the `trace` setting. type traceOption struct { - loggingOption + traceLevelOption newValue bool } @@ -84,6 +102,17 @@ func (t *traceOption) Apply(server *Server) { server.Noticef("Reloaded: trace = %v", t.newValue) } +// traceOption implements the option interface for the `trace` setting. +type traceVerboseOption struct { + traceLevelOption + newValue bool +} + +// Apply is a no-op because logging will be reloaded after options are applied. +func (t *traceVerboseOption) Apply(server *Server) { + server.Noticef("Reloaded: trace_verbose = %v", t.newValue) +} + // debugOption implements the option interface for the `debug` setting. type debugOption struct { loggingOption @@ -676,6 +705,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { continue } switch strings.ToLower(field.Name) { + case "traceverbose": + diffOpts = append(diffOpts, &traceVerboseOption{newValue: newValue.(bool)}) case "trace": diffOpts = append(diffOpts, &traceOption{newValue: newValue.(bool)}) case "debug": @@ -819,12 +850,16 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { reloadLogging = false reloadAuth = false reloadClusterPerms = false + reloadClientTrcLvl = false ) for _, opt := range opts { opt.Apply(s) if opt.IsLoggingChange() { reloadLogging = true } + if opt.IsTraceLevelChange() { + reloadClientTrcLvl = true + } if opt.IsAuthChange() { reloadAuth = true } @@ -836,6 +871,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { if reloadLogging { s.ConfigureLogger() } + if reloadClientTrcLvl { + s.reloadClientTraceLevel() + } if reloadAuth { s.reloadAuthorization() } @@ -846,6 +884,49 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { s.Noticef("Reloaded server configuration") } +// Update all cached debug and trace settings for every client +func (s *Server) reloadClientTraceLevel() { + opts := s.getOpts() + + if opts.NoLog { + return + } + + update := func(c *client) { + // client.trace is commonly read while holding the lock + c.mu.Lock() + defer c.mu.Unlock() + c.setTraceLevel() + } + + // Iterate over every client and update + // If this is not timely AND unexpected during a reload (with changes in trace level): + // Consider storing clients in a list, ten iterate without holding locks + // Or usage of sync.Map to store clients in the first place + + if s.eventsEnabled() { + update(s.sys.client) + } + + s.mu.Lock() + cMaps := []map[uint64]*client{s.clients, s.grTmpClients, s.routes, s.leafs} + for _, m := range cMaps { + for _, c := range m { + update(c) + } + } + s.mu.Unlock() + + s.gateway.RLock() + for _, c := range s.gateway.in { + update(c) + } + for _, c := range s.gateway.outo { + update(c) + } + s.gateway.RUnlock() +} + // reloadAuthorization reconfigures the server authorization settings, // disconnects any clients who are no longer authorized, and removes any // unauthorized subscriptions. diff --git a/server/reload_test.go b/server/reload_test.go index 89aa0e7c..888e6cd6 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -20,7 +20,6 @@ import ( "encoding/json" "flag" "fmt" - "github.com/nats-io/jwt" "io/ioutil" "log" "net" @@ -35,6 +34,8 @@ import ( "testing" "time" + "github.com/nats-io/jwt" + "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" ) @@ -3569,6 +3570,30 @@ func TestConfigReloadBoolFlags(t *testing.T) { false, func() bool { return opts.Debug && opts.Trace }, }, + { + "trace_verbose_true_in_config_override_true", + `trace_verbose: true + `, + nil, + true, + func() bool { return opts.Trace && opts.TraceVerbose }, + }, + { + "trace_verbose_true_in_config_override_false", + `trace_verbose: true + `, + []string{"--VV=false"}, + true, + func() bool { return !opts.TraceVerbose }, + }, + { + "trace_verbose_true_in_config_override_false", + `trace_verbose: false + `, + []string{"--VV=true"}, + true, + func() bool { return opts.TraceVerbose }, + }, } { t.Run(test.name, func(t *testing.T) { conf := createConfFile(t, []byte(fmt.Sprintf(template, test.content))) @@ -3973,3 +3998,126 @@ func TestConfigReloadAccountResolverTLSConfig(t *testing.T) { t.Fatalf("Account name did not match claim key") } } + +func TestLoggingReload(t *testing.T) { + // This test basically starts a server and causes it's configuration to be reloaded 3 times. + // Each time, a new log file is created and trace levels are turned, off - on - off. + + // At the end of the test, all 3 log files are inspected for certain traces. + countMatches := func(log []byte, stmts ...string) int { + matchCnt := 0 + for _, stmt := range stmts { + if strings.Contains(string(log), stmt) { + matchCnt++ + } + } + return matchCnt + } + + traces := []string{"[TRC]", "[DBG]", "SYSTEM", "MSG_PAYLOAD", "$SYS.SERVER.ACCOUNT"} + + didTrace := func(log []byte) bool { + return countMatches(log, "[INF] Reloaded server configuration") == 1 + } + + tracingAbsent := func(log []byte) bool { + return countMatches(log, traces...) == 0 && didTrace(log) + } + + tracingPresent := func(log []byte) bool { + return len(traces) == countMatches(log, traces...) && didTrace(log) + } + + check := func(filename string, valid func([]byte) bool) { + log, err := ioutil.ReadFile(filename) + if err != nil { + t.Fatalf("Error reading log file %s: %v\n", filename, err) + } + if !valid(log) { + t.Fatalf("%s is not valid: %s", filename, log) + } + //t.Logf("%s contains: %s\n", filename, log) + } + + // common configuration setting up system accounts. trace_verbose needs this to cause traces + commonCfg := ` + port: -1 + system_account: sys + accounts { + sys { users = [ {user: sys, pass: "" } ] } + nats.io: { users = [ { user : bar, pass: "pwd" } ] } + } + ` + + conf := createConfFile(t, []byte(commonCfg)) + defer os.Remove(conf) + + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + reload := func(change string) { + changeCurrentConfigContentWithNewContent(t, conf, []byte(commonCfg+` + `+change+` + `)) + + if err := s.Reload(); err != nil { + t.Fatalf("Error during reload: %v", err) + } + } + + traffic := func(cnt int) { + // Create client and sub interest on server and create traffic + urlSeed := fmt.Sprintf("nats://bar:pwd@%s:%d/", opts.Host, opts.Port) + nc, err := nats.Connect(urlSeed) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + + msgs := make(chan *nats.Msg) + defer close(msgs) + + sub, err := nc.ChanSubscribe("foo", msgs) + if err != nil { + t.Fatalf("Error creating subscriber: %v\n", err) + } + + nc.Flush() + + for i := 0; i < cnt; i++ { + if err := nc.Publish("foo", []byte("bar")); err == nil { + <-msgs + } + } + + sub.Unsubscribe() + nc.Close() + } + + defer os.Remove("off-pre.log") + reload("log_file: off-pre.log") + + traffic(10) // generate NO trace/debug entries in off-pre.log + + defer os.Remove("on.log") + reload(` + log_file: on.log + debug: true + trace_verbose: true + `) + + traffic(10) // generate trace/debug entries in on.log + + defer os.Remove("off-post.log") + reload(` + log_file: off-post.log + debug: false + trace_verbose: false + `) + + traffic(10) // generate trace/debug entries in off-post.log + + // check resulting log files for expected content + check("off-pre.log", tracingAbsent) + check("on.log", tracingPresent) + check("off-post.log", tracingAbsent) +} diff --git a/server/route.go b/server/route.go index 0c190c6d..b0470c33 100644 --- a/server/route.go +++ b/server/route.go @@ -139,8 +139,10 @@ func (c *client) removeReplySubTimeout(sub *subscription) { } } -func (c *client) processAccountSub(arg []byte) error { - c.traceInOp("A+", arg) +func (c *client) processAccountSub(arg []byte, trace bool) error { + if trace { + c.traceInOp("A+", arg) + } accName := string(arg) if c.kind == GATEWAY { return c.processGatewayAccountSub(accName) @@ -157,7 +159,7 @@ func (c *client) processAccountUnsub(arg []byte) { } // Process an inbound RMSG specification from the remote route. -func (c *client) processRoutedMsgArgs(trace bool, arg []byte) error { +func (c *client) processRoutedMsgArgs(arg []byte, trace bool) error { if trace { c.traceInOp("RMSG", arg) } @@ -232,13 +234,13 @@ func (c *client) processRoutedMsgArgs(trace bool, arg []byte) error { } // processInboundRouteMsg is called to process an inbound msg from a route. -func (c *client) processInboundRoutedMsg(msg []byte) { +func (c *client) processInboundRoutedMsg(msg []byte, trace bool) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += int32(len(msg) - LEN_CR_LF) - if c.trace { + if trace { c.traceMsg(msg) } @@ -666,8 +668,10 @@ func (c *client) removeRemoteSubs() { } } -func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) { - c.traceInOp("RS-", arg) +func (c *client) parseUnsubProto(trace bool, arg []byte) (string, []byte, []byte, error) { + if trace { + c.traceInOp("RS-", arg) + } // Indicate any activity, so pub and sub or unsubs. c.in.subs++ @@ -686,12 +690,12 @@ func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) { } // Indicates no more interest in the given account/subject for the remote side. -func (c *client) processRemoteUnsub(arg []byte) (err error) { +func (c *client) processRemoteUnsub(arg []byte, trace bool) (err error) { srv := c.srv if srv == nil { return nil } - accountName, subject, _, err := c.parseUnsubProto(arg) + accountName, subject, _, err := c.parseUnsubProto(trace, arg) if err != nil { return fmt.Errorf("processRemoteUnsub %s", err.Error()) } @@ -736,8 +740,10 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { return nil } -func (c *client) processRemoteSub(argo []byte) (err error) { - c.traceInOp("RS+", argo) +func (c *client) processRemoteSub(argo []byte, trace bool) (err error) { + if trace { + c.traceInOp("RS+", argo) + } // Indicate activity. c.in.subs++ diff --git a/server/server.go b/server/server.go index eca22485..18de4453 100644 --- a/server/server.go +++ b/server/server.go @@ -157,7 +157,7 @@ type Server struct { logger Logger trace int32 debug int32 - traceSysAcc bool + traceSysAcc int32 } clientConnectURLs []string