diff --git a/server/client.go b/server/client.go index 27afd28a..24a58746 100644 --- a/server/client.go +++ b/server/client.go @@ -229,7 +229,6 @@ type client struct { flags clientFlag // Compact booleans into a single field. Size will be increased when needed. - debug bool trace bool echo bool } @@ -434,6 +433,14 @@ func init() { rand.Seed(time.Now().UnixNano()) } +func (c *client) setTraceLevel() { + if c.kind == SYSTEM && !(atomic.LoadInt32(&c.srv.logging.traceSysAcc) != 0) { + c.trace = false + } else { + c.trace = (atomic.LoadInt32(&c.srv.logging.trace) != 0) + } +} + // Lock should be held func (c *client) initClient() { s := c.srv @@ -450,12 +457,7 @@ func (c *client) initClient() { c.subs = make(map[string]*subscription) c.echo = true - c.debug = (atomic.LoadInt32(&c.srv.logging.debug) != 0) - c.trace = (atomic.LoadInt32(&c.srv.logging.trace) != 0) - - if c.kind == SYSTEM && !c.srv.logging.traceSysAcc { - c.trace = false - } + c.setTraceLevel() // This is a scratch buffer used for processMsg() // The msg header starts with "RMSG ", which can be used @@ -1220,11 +1222,9 @@ func (c *client) flushSignal() bool { return false } +// Traces a message. +// Will NOT check if tracing is enabled, does NOT need the client lock. func (c *client) traceMsg(msg []byte) { - if !c.trace { - return - } - maxTrace := c.srv.getOpts().MaxTracedMsgLen if maxTrace > 0 && (len(msg)-LEN_CR_LF) > maxTrace { c.Tracef("<<- MSG_PAYLOAD: [\"%s...\"]", msg[:maxTrace]) @@ -1233,19 +1233,27 @@ func (c *client) traceMsg(msg []byte) { } } +// Traces an incoming operation. +// Will NOT check if tracing is enabled, does NOT need the client lock. func (c *client) traceInOp(op string, arg []byte) { c.traceOp("<<- %s", op, arg) } +// Traces an outgoing operation. +// Will check if tracing is enabled, DOES need the client lock. +func (c *client) traceOutOpIfOk(op string, arg []byte) { + if c.trace { + c.traceOutOp(op, arg) + } +} + +// Traces an outgoing operation. +// Will NOT check if tracing is enabled, does NOT need the client lock. func (c *client) traceOutOp(op string, arg []byte) { c.traceOp("->> %s", op, arg) } func (c *client) traceOp(format, op string, arg []byte) { - if !c.trace { - return - } - opa := []interface{}{} if op != "" { opa = append(opa, op) @@ -1331,8 +1339,8 @@ func computeRTT(start time.Time) time.Duration { return rtt } -func (c *client) processConnect(arg []byte) error { - if c.trace { +func (c *client) processConnect(arg []byte, trace bool) error { + if trace { c.traceInOp("CONNECT", removePassFromTrace(arg)) } @@ -1655,7 +1663,7 @@ func (c *client) enqueueProto(proto []byte) { // Assume the lock is held upon entry. func (c *client) sendPong() { - c.traceOutOp("PONG", nil) + c.traceOutOpIfOk("PONG", nil) c.enqueueProto([]byte(pongProto)) } @@ -1689,7 +1697,7 @@ func (c *client) sendRTTPingLocked() bool { func (c *client) sendPing() { c.rttStart = time.Now() c.ping.out++ - c.traceOutOp("PING", nil) + c.traceOutOpIfOk("PING", nil) c.enqueueProto([]byte(pingProto)) } @@ -1708,14 +1716,14 @@ func (c *client) generateClientInfoJSON(info Info) []byte { func (c *client) sendErr(err string) { c.mu.Lock() - c.traceOutOp("-ERR", []byte(err)) + c.traceOutOpIfOk("-ERR", []byte(err)) c.enqueueProto([]byte(fmt.Sprintf(errProto, err))) c.mu.Unlock() } func (c *client) sendOK() { c.mu.Lock() - c.traceOutOp("OK", nil) + c.traceOutOpIfOk("OK", nil) c.enqueueProto([]byte(okProto)) c.pcd[c] = needFlush c.mu.Unlock() @@ -1723,7 +1731,10 @@ func (c *client) sendOK() { func (c *client) processPing() { c.mu.Lock() - c.traceInOp("PING", nil) + if c.trace { + c.traceInOp("PING", nil) + } + if c.isClosed() { c.mu.Unlock() return @@ -1781,8 +1792,10 @@ func (c *client) processPing() { } func (c *client) processPong() { - c.traceInOp("PONG", nil) c.mu.Lock() + if c.trace { + c.traceInOp("PONG", nil) + } c.ping.out = 0 c.rtt = computeRTT(c.rttStart) srv := c.srv @@ -1793,7 +1806,7 @@ func (c *client) processPong() { } } -func (c *client) processPub(trace bool, arg []byte) error { +func (c *client) processPub(arg []byte, trace bool) error { if trace { c.traceInOp("PUB", arg) } @@ -1874,8 +1887,10 @@ func splitArg(arg []byte) [][]byte { return args } -func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) { - c.traceInOp("SUB", argo) +func (c *client) processSub(argo []byte, noForward bool, trace bool) (*subscription, error) { + if trace { + c.traceInOp("SUB", argo) + } // Indicate activity. c.in.subs++ @@ -2213,7 +2228,10 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool c.mu.Unlock() return } - c.traceOp("<-> %s", "DELSUB", sub.sid) + + if c.trace { + c.traceOp("<-> %s", "DELSUB", sub.sid) + } if c.kind != CLIENT && c.kind != SYSTEM { c.removeReplySubTimeout(sub) @@ -2255,8 +2273,10 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool } } -func (c *client) processUnsub(arg []byte) error { - c.traceInOp("UNSUB", arg) +func (c *client) processUnsub(arg []byte, trace bool) error { + if trace { + c.traceInOp("UNSUB", arg) + } args := splitArg(arg) var sid []byte max := -1 @@ -2710,27 +2730,27 @@ func isReservedReply(reply []byte) bool { } // This will decide to call the client code or router code. -func (c *client) processInboundMsg(msg []byte) { +func (c *client) processInboundMsg(msg []byte, trace bool) { switch c.kind { case CLIENT: - c.processInboundClientMsg(msg) + c.processInboundClientMsg(msg, trace) case ROUTER: - c.processInboundRoutedMsg(msg) + c.processInboundRoutedMsg(msg, trace) case GATEWAY: - c.processInboundGatewayMsg(msg) + c.processInboundGatewayMsg(msg, trace) case LEAF: - c.processInboundLeafMsg(msg) + c.processInboundLeafMsg(msg, trace) } } // processInboundClientMsg is called to process an inbound msg from a client. -func (c *client) processInboundClientMsg(msg []byte) { +func (c *client) processInboundClientMsg(msg []byte, trace bool) { // Update statistics // The msg includes the CR_LF, so pull back out for accounting. c.in.msgs++ c.in.bytes += int32(len(msg) - LEN_CR_LF) - if c.trace { + if trace { c.traceMsg(msg) } diff --git a/server/events.go b/server/events.go index e3c60354..972f2d64 100644 --- a/server/events.go +++ b/server/events.go @@ -265,16 +265,17 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) { c.pa.size = len(b) c.pa.szb = []byte(strconv.FormatInt(int64(len(b)), 10)) c.pa.reply = []byte(pm.rply) + trace := c.trace c.mu.Unlock() - if c.trace { + if trace { c.traceInOp(fmt.Sprintf( "PUB %s %s %d", c.pa.subject, c.pa.reply, c.pa.size), nil) } // Add in NL b = append(b, _CRLF_...) - c.processInboundClientMsg(b) + c.processInboundClientMsg(b, trace) // See if we are doing graceful shutdown. if !pm.last { c.flushClients(0) // Never spend time in place. @@ -1020,10 +1021,11 @@ func (s *Server) systemSubscribe(subject string, internalOnly bool, cb msgHandle s.sys.subs[sid] = cb s.sys.sid++ c := s.sys.client + trace := c.trace s.mu.Unlock() // Now create the subscription - return c.processSub([]byte(subject+" "+sid), internalOnly) + return c.processSub([]byte(subject+" "+sid), internalOnly, trace) } func (s *Server) sysUnsubscribe(sub *subscription) { diff --git a/server/gateway.go b/server/gateway.go index 3b9bb064..44c64ba7 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -1722,8 +1722,8 @@ func (c *client) processGatewayAccountSub(accName string) error { // If in modeInterestOnly or for a queue sub, remove from // the sublist if present. // -func (c *client) processGatewayRUnsub(arg []byte) error { - accName, subject, queue, err := c.parseUnsubProto(arg) +func (c *client) processGatewayRUnsub(arg []byte, trace bool) error { + accName, subject, queue, err := c.parseUnsubProto(trace, arg) if err != nil { return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error()) } @@ -1813,8 +1813,10 @@ func (c *client) processGatewayRUnsub(arg []byte) error { // For queue subs, or if in modeInterestOnly, register interest // from remote gateway. // -func (c *client) processGatewayRSub(arg []byte) error { - c.traceInOp("RS+", arg) +func (c *client) processGatewayRSub(arg []byte, trace bool) error { + if trace { + c.traceInOp("RS+", arg) + } // Indicate activity. c.in.subs++ @@ -2725,13 +2727,13 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) { // account or subject for which there is no interest in this cluster // an A-/RS- protocol may be send back. // -func (c *client) processInboundGatewayMsg(msg []byte) { +func (c *client) processInboundGatewayMsg(msg []byte, trace bool) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += int32(len(msg) - LEN_CR_LF) - if c.trace { + if trace { c.traceMsg(msg) } diff --git a/server/leafnode.go b/server/leafnode.go index ccef467d..926bfeb7 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1193,6 +1193,7 @@ func (c *client) sendAllLeafSubs() { c.mu.Unlock() } +// Lock should be held. func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) { if key == "" { return @@ -1226,8 +1227,10 @@ func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) { } // processLeafSub will process an inbound sub request for the remote leaf node. -func (c *client) processLeafSub(argo []byte) (err error) { - c.traceInOp("LS+", argo) +func (c *client) processLeafSub(argo []byte, trace bool) (err error) { + if trace { + c.traceInOp("LS+", argo) + } // Indicate activity. c.in.subs++ @@ -1352,8 +1355,10 @@ func (s *Server) reportLeafNodeLoop(c *client) { } // processLeafUnsub will process an inbound unsub request for the remote leaf node. -func (c *client) processLeafUnsub(arg []byte) error { - c.traceInOp("LS-", arg) +func (c *client) processLeafUnsub(arg []byte, trace bool) error { + if trace { + c.traceInOp("LS-", arg) + } // Indicate any activity, so pub and sub or unsubs. c.in.subs++ @@ -1389,7 +1394,7 @@ func (c *client) processLeafUnsub(arg []byte) error { return nil } -func (c *client) processLeafMsgArgs(trace bool, arg []byte) error { +func (c *client) processLeafMsgArgs(arg []byte, trace bool) error { if trace { c.traceInOp("LMSG", arg) } @@ -1464,13 +1469,13 @@ func (c *client) processLeafMsgArgs(trace bool, arg []byte) error { } // processInboundLeafMsg is called to process an inbound msg from a leaf node. -func (c *client) processInboundLeafMsg(msg []byte) { +func (c *client) processInboundLeafMsg(msg []byte, trace bool) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += int32(len(msg) - LEN_CR_LF) - if c.trace { + if trace { c.traceMsg(msg) } diff --git a/server/log.go b/server/log.go index 0787d06c..862088c8 100644 --- a/server/log.go +++ b/server/log.go @@ -85,13 +85,16 @@ func (s *Server) ConfigureLogger() { log = srvlog.NewStdLogger(opts.Logtime, opts.Debug, opts.Trace, colors, true) } - s.SetLogger(log, opts.Debug, opts.Trace) - - s.logging.traceSysAcc = opts.TraceVerbose + s.SetLoggerV2(log, opts.Debug, opts.Trace, opts.TraceVerbose) } // SetLogger sets the logger of the server func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool) { + s.SetLoggerV2(logger, debugFlag, traceFlag, false) +} + +// SetLogger sets the logger of the server +func (s *Server) SetLoggerV2(logger Logger, debugFlag, traceFlag, sysTrace bool) { if debugFlag { atomic.StoreInt32(&s.logging.debug, 1) } else { @@ -102,6 +105,11 @@ func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool) { } else { atomic.StoreInt32(&s.logging.trace, 0) } + if sysTrace { + atomic.StoreInt32(&s.logging.traceSysAcc, 1) + } else { + atomic.StoreInt32(&s.logging.traceSysAcc, 0) + } s.logging.Lock() if s.logging.logger != nil { // Check to see if the logger implements io.Closer. This could be a diff --git a/server/parser.go b/server/parser.go index 8d58962b..ba1eca01 100644 --- a/server/parser.go +++ b/server/parser.go @@ -119,6 +119,7 @@ func (c *client) parse(buf []byte) error { authSet := c.awaitingAuth() // Snapshot max control line as well. mcl := c.mcl + trace := c.trace c.mu.Unlock() // Move to loop instead of range syntax to allow jumping of i @@ -220,7 +221,7 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processPub(c.trace, arg); err != nil { + if err := c.processPub(arg, trace); err != nil { return err } c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD @@ -277,7 +278,7 @@ func (c *client) parse(buf []byte) error { } else { c.msgBuf = buf[c.as : i+1] } - c.processInboundMsg(c.msgBuf) + c.processInboundMsg(c.msgBuf, trace) c.argBuf, c.msgBuf = nil, nil c.drop, c.as, c.state = 0, i+1, OP_START // Drop all pub args @@ -319,7 +320,7 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processAccountSub(arg); err != nil { + if err := c.processAccountSub(arg, trace); err != nil { return err } c.drop, c.as, c.state = 0, i+1, OP_START @@ -407,13 +408,13 @@ func (c *client) parse(buf []byte) error { switch c.kind { case CLIENT: - _, err = c.processSub(arg, false) + _, err = c.processSub(arg, false, trace) case ROUTER: - err = c.processRemoteSub(arg) + err = c.processRemoteSub(arg, trace) case GATEWAY: - err = c.processGatewayRSub(arg) + err = c.processGatewayRSub(arg, trace) case LEAF: - err = c.processLeafSub(arg) + err = c.processLeafSub(arg, trace) } if err != nil { return err @@ -519,13 +520,13 @@ func (c *client) parse(buf []byte) error { switch c.kind { case CLIENT: - err = c.processUnsub(arg) + err = c.processUnsub(arg, trace) case ROUTER: - err = c.processRemoteUnsub(arg) + err = c.processRemoteUnsub(arg, trace) case GATEWAY: - err = c.processGatewayRUnsub(arg) + err = c.processGatewayRUnsub(arg, trace) case LEAF: - err = c.processLeafUnsub(arg) + err = c.processLeafUnsub(arg, trace) } if err != nil { return err @@ -638,7 +639,7 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processConnect(arg); err != nil { + if err := c.processConnect(arg, trace); err != nil { return err } c.drop, c.state = 0, OP_START @@ -694,9 +695,9 @@ func (c *client) parse(buf []byte) error { } var err error if c.kind == ROUTER || c.kind == GATEWAY { - err = c.processRoutedMsgArgs(c.trace, arg) + err = c.processRoutedMsgArgs(arg, trace) } else if c.kind == LEAF { - err = c.processLeafMsgArgs(c.trace, arg) + err = c.processLeafMsgArgs(arg, trace) } if err != nil { return err @@ -869,7 +870,7 @@ func (c *client) parse(buf []byte) error { // read buffer and we are not able to process the msg. if c.argBuf == nil { // Works also for MSG_ARG, when message comes from ROUTE. - if err := c.clonePubArg(); err != nil { + if err := c.clonePubArg(trace); err != nil { goto parseErr } } @@ -921,17 +922,17 @@ func protoSnippet(start int, buf []byte) string { // clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but // we need to hold onto it into the next read. -func (c *client) clonePubArg() error { +func (c *client) clonePubArg(trace bool) error { // Just copy and re-process original arg buffer. c.argBuf = c.scratch[:0] c.argBuf = append(c.argBuf, c.pa.arg...) switch c.kind { case ROUTER, GATEWAY: - return c.processRoutedMsgArgs(false, c.argBuf) + return c.processRoutedMsgArgs(c.argBuf, trace) case LEAF: - return c.processLeafMsgArgs(false, c.argBuf) + return c.processLeafMsgArgs(c.argBuf, trace) default: - return c.processPub(false, c.argBuf) + return c.processPub(c.argBuf, trace) } } diff --git a/server/parser_test.go b/server/parser_test.go index 7f2aece9..1e267157 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -301,7 +301,7 @@ func TestParsePubArg(t *testing.T) { subject: "foo", reply: "", size: 2222, szb: "2222"}, } { t.Run(test.arg, func(t *testing.T) { - if err := c.processPub(false, []byte(test.arg)); err != nil { + if err := c.processPub([]byte(test.arg), false); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } if !bytes.Equal(c.pa.subject, []byte(test.subject)) { @@ -324,7 +324,7 @@ func TestParsePubBadSize(t *testing.T) { c := dummyClient() // Setup localized max payload c.mpay = 32768 - if err := c.processPub(false, []byte("foo 2222222222222222")); err == nil { + if err := c.processPub([]byte("foo 2222222222222222"), false); err == nil { t.Fatalf("Expected parse error for size too large") } } diff --git a/server/reload.go b/server/reload.go index 89553be6..0d5dfac5 100644 --- a/server/reload.go +++ b/server/reload.go @@ -40,6 +40,10 @@ type option interface { // IsLoggingChange indicates if this option requires reloading the logger. IsLoggingChange() bool + // IsTraceLevelChange indicates if this option requires reloading cached trace level. + // Clients store trace level separately. + IsTraceLevelChange() bool + // IsAuthChange indicates if this option requires reloading authorization. IsAuthChange() bool @@ -55,6 +59,10 @@ func (n noopOption) IsLoggingChange() bool { return false } +func (n noopOption) IsTraceLevelChange() bool { + return false +} + func (n noopOption) IsAuthChange() bool { return false } @@ -73,9 +81,19 @@ func (l loggingOption) IsLoggingChange() bool { return true } +// traceLevelOption is a base struct that provides default option behaviors for +// tracelevel-related options. +type traceLevelOption struct { + loggingOption +} + +func (l traceLevelOption) IsTraceLevelChange() bool { + return true +} + // traceOption implements the option interface for the `trace` setting. type traceOption struct { - loggingOption + traceLevelOption newValue bool } @@ -84,6 +102,17 @@ func (t *traceOption) Apply(server *Server) { server.Noticef("Reloaded: trace = %v", t.newValue) } +// traceOption implements the option interface for the `trace` setting. +type traceVerboseOption struct { + traceLevelOption + newValue bool +} + +// Apply is a no-op because logging will be reloaded after options are applied. +func (t *traceVerboseOption) Apply(server *Server) { + server.Noticef("Reloaded: trace_verbose = %v", t.newValue) +} + // debugOption implements the option interface for the `debug` setting. type debugOption struct { loggingOption @@ -676,6 +705,8 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { continue } switch strings.ToLower(field.Name) { + case "traceverbose": + diffOpts = append(diffOpts, &traceVerboseOption{newValue: newValue.(bool)}) case "trace": diffOpts = append(diffOpts, &traceOption{newValue: newValue.(bool)}) case "debug": @@ -819,12 +850,16 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { reloadLogging = false reloadAuth = false reloadClusterPerms = false + reloadClientTrcLvl = false ) for _, opt := range opts { opt.Apply(s) if opt.IsLoggingChange() { reloadLogging = true } + if opt.IsTraceLevelChange() { + reloadClientTrcLvl = true + } if opt.IsAuthChange() { reloadAuth = true } @@ -836,6 +871,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { if reloadLogging { s.ConfigureLogger() } + if reloadClientTrcLvl { + s.reloadClientTraceLevel() + } if reloadAuth { s.reloadAuthorization() } @@ -846,6 +884,49 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { s.Noticef("Reloaded server configuration") } +// Update all cached debug and trace settings for every client +func (s *Server) reloadClientTraceLevel() { + opts := s.getOpts() + + if opts.NoLog { + return + } + + update := func(c *client) { + // client.trace is commonly read while holding the lock + c.mu.Lock() + defer c.mu.Unlock() + c.setTraceLevel() + } + + // Iterate over every client and update + // If this is not timely AND unexpected during a reload (with changes in trace level): + // Consider storing clients in a list, ten iterate without holding locks + // Or usage of sync.Map to store clients in the first place + + if s.eventsEnabled() { + update(s.sys.client) + } + + s.mu.Lock() + cMaps := []map[uint64]*client{s.clients, s.grTmpClients, s.routes, s.leafs} + for _, m := range cMaps { + for _, c := range m { + update(c) + } + } + s.mu.Unlock() + + s.gateway.RLock() + for _, c := range s.gateway.in { + update(c) + } + for _, c := range s.gateway.outo { + update(c) + } + s.gateway.RUnlock() +} + // reloadAuthorization reconfigures the server authorization settings, // disconnects any clients who are no longer authorized, and removes any // unauthorized subscriptions. diff --git a/server/reload_test.go b/server/reload_test.go index 89aa0e7c..888e6cd6 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -20,7 +20,6 @@ import ( "encoding/json" "flag" "fmt" - "github.com/nats-io/jwt" "io/ioutil" "log" "net" @@ -35,6 +34,8 @@ import ( "testing" "time" + "github.com/nats-io/jwt" + "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" ) @@ -3569,6 +3570,30 @@ func TestConfigReloadBoolFlags(t *testing.T) { false, func() bool { return opts.Debug && opts.Trace }, }, + { + "trace_verbose_true_in_config_override_true", + `trace_verbose: true + `, + nil, + true, + func() bool { return opts.Trace && opts.TraceVerbose }, + }, + { + "trace_verbose_true_in_config_override_false", + `trace_verbose: true + `, + []string{"--VV=false"}, + true, + func() bool { return !opts.TraceVerbose }, + }, + { + "trace_verbose_true_in_config_override_false", + `trace_verbose: false + `, + []string{"--VV=true"}, + true, + func() bool { return opts.TraceVerbose }, + }, } { t.Run(test.name, func(t *testing.T) { conf := createConfFile(t, []byte(fmt.Sprintf(template, test.content))) @@ -3973,3 +3998,126 @@ func TestConfigReloadAccountResolverTLSConfig(t *testing.T) { t.Fatalf("Account name did not match claim key") } } + +func TestLoggingReload(t *testing.T) { + // This test basically starts a server and causes it's configuration to be reloaded 3 times. + // Each time, a new log file is created and trace levels are turned, off - on - off. + + // At the end of the test, all 3 log files are inspected for certain traces. + countMatches := func(log []byte, stmts ...string) int { + matchCnt := 0 + for _, stmt := range stmts { + if strings.Contains(string(log), stmt) { + matchCnt++ + } + } + return matchCnt + } + + traces := []string{"[TRC]", "[DBG]", "SYSTEM", "MSG_PAYLOAD", "$SYS.SERVER.ACCOUNT"} + + didTrace := func(log []byte) bool { + return countMatches(log, "[INF] Reloaded server configuration") == 1 + } + + tracingAbsent := func(log []byte) bool { + return countMatches(log, traces...) == 0 && didTrace(log) + } + + tracingPresent := func(log []byte) bool { + return len(traces) == countMatches(log, traces...) && didTrace(log) + } + + check := func(filename string, valid func([]byte) bool) { + log, err := ioutil.ReadFile(filename) + if err != nil { + t.Fatalf("Error reading log file %s: %v\n", filename, err) + } + if !valid(log) { + t.Fatalf("%s is not valid: %s", filename, log) + } + //t.Logf("%s contains: %s\n", filename, log) + } + + // common configuration setting up system accounts. trace_verbose needs this to cause traces + commonCfg := ` + port: -1 + system_account: sys + accounts { + sys { users = [ {user: sys, pass: "" } ] } + nats.io: { users = [ { user : bar, pass: "pwd" } ] } + } + ` + + conf := createConfFile(t, []byte(commonCfg)) + defer os.Remove(conf) + + s, opts := RunServerWithConfig(conf) + defer s.Shutdown() + + reload := func(change string) { + changeCurrentConfigContentWithNewContent(t, conf, []byte(commonCfg+` + `+change+` + `)) + + if err := s.Reload(); err != nil { + t.Fatalf("Error during reload: %v", err) + } + } + + traffic := func(cnt int) { + // Create client and sub interest on server and create traffic + urlSeed := fmt.Sprintf("nats://bar:pwd@%s:%d/", opts.Host, opts.Port) + nc, err := nats.Connect(urlSeed) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + + msgs := make(chan *nats.Msg) + defer close(msgs) + + sub, err := nc.ChanSubscribe("foo", msgs) + if err != nil { + t.Fatalf("Error creating subscriber: %v\n", err) + } + + nc.Flush() + + for i := 0; i < cnt; i++ { + if err := nc.Publish("foo", []byte("bar")); err == nil { + <-msgs + } + } + + sub.Unsubscribe() + nc.Close() + } + + defer os.Remove("off-pre.log") + reload("log_file: off-pre.log") + + traffic(10) // generate NO trace/debug entries in off-pre.log + + defer os.Remove("on.log") + reload(` + log_file: on.log + debug: true + trace_verbose: true + `) + + traffic(10) // generate trace/debug entries in on.log + + defer os.Remove("off-post.log") + reload(` + log_file: off-post.log + debug: false + trace_verbose: false + `) + + traffic(10) // generate trace/debug entries in off-post.log + + // check resulting log files for expected content + check("off-pre.log", tracingAbsent) + check("on.log", tracingPresent) + check("off-post.log", tracingAbsent) +} diff --git a/server/route.go b/server/route.go index 0c190c6d..b0470c33 100644 --- a/server/route.go +++ b/server/route.go @@ -139,8 +139,10 @@ func (c *client) removeReplySubTimeout(sub *subscription) { } } -func (c *client) processAccountSub(arg []byte) error { - c.traceInOp("A+", arg) +func (c *client) processAccountSub(arg []byte, trace bool) error { + if trace { + c.traceInOp("A+", arg) + } accName := string(arg) if c.kind == GATEWAY { return c.processGatewayAccountSub(accName) @@ -157,7 +159,7 @@ func (c *client) processAccountUnsub(arg []byte) { } // Process an inbound RMSG specification from the remote route. -func (c *client) processRoutedMsgArgs(trace bool, arg []byte) error { +func (c *client) processRoutedMsgArgs(arg []byte, trace bool) error { if trace { c.traceInOp("RMSG", arg) } @@ -232,13 +234,13 @@ func (c *client) processRoutedMsgArgs(trace bool, arg []byte) error { } // processInboundRouteMsg is called to process an inbound msg from a route. -func (c *client) processInboundRoutedMsg(msg []byte) { +func (c *client) processInboundRoutedMsg(msg []byte, trace bool) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += int32(len(msg) - LEN_CR_LF) - if c.trace { + if trace { c.traceMsg(msg) } @@ -666,8 +668,10 @@ func (c *client) removeRemoteSubs() { } } -func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) { - c.traceInOp("RS-", arg) +func (c *client) parseUnsubProto(trace bool, arg []byte) (string, []byte, []byte, error) { + if trace { + c.traceInOp("RS-", arg) + } // Indicate any activity, so pub and sub or unsubs. c.in.subs++ @@ -686,12 +690,12 @@ func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) { } // Indicates no more interest in the given account/subject for the remote side. -func (c *client) processRemoteUnsub(arg []byte) (err error) { +func (c *client) processRemoteUnsub(arg []byte, trace bool) (err error) { srv := c.srv if srv == nil { return nil } - accountName, subject, _, err := c.parseUnsubProto(arg) + accountName, subject, _, err := c.parseUnsubProto(trace, arg) if err != nil { return fmt.Errorf("processRemoteUnsub %s", err.Error()) } @@ -736,8 +740,10 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { return nil } -func (c *client) processRemoteSub(argo []byte) (err error) { - c.traceInOp("RS+", argo) +func (c *client) processRemoteSub(argo []byte, trace bool) (err error) { + if trace { + c.traceInOp("RS+", argo) + } // Indicate activity. c.in.subs++ diff --git a/server/server.go b/server/server.go index eca22485..18de4453 100644 --- a/server/server.go +++ b/server/server.go @@ -157,7 +157,7 @@ type Server struct { logger Logger trace int32 debug int32 - traceSysAcc bool + traceSysAcc int32 } clientConnectURLs []string