From 6a1c3fc29b0cf1991ba5ea1157528d293b6234ff Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Wed, 4 Mar 2020 17:31:18 -0500 Subject: [PATCH] Moving inbound tracing to the caller (client.parse) Tracing for outgoing operations is always done while holding the client lock. Signed-off-by: Matthias Hanel --- server/client.go | 69 ++++++++++--------------------- server/events.go | 21 ++++++---- server/gateway.go | 16 ++------ server/leafnode.go | 24 ++--------- server/parser.go | 94 +++++++++++++++++++++++++++++++++++-------- server/parser_test.go | 4 +- server/route.go | 33 ++++----------- 7 files changed, 130 insertions(+), 131 deletions(-) diff --git a/server/client.go b/server/client.go index 099d0d9a..005771da 100644 --- a/server/client.go +++ b/server/client.go @@ -1239,14 +1239,6 @@ func (c *client) traceInOp(op string, arg []byte) { c.traceOp("<<- %s", op, arg) } -// Traces an outgoing operation. -// Will check if tracing is enabled, DOES need the client lock. -func (c *client) traceOutOpIfOk(op string, arg []byte) { - if c.trace { - c.traceOutOp(op, arg) - } -} - // Traces an outgoing operation. // Will NOT check if tracing is enabled, does NOT need the client lock. func (c *client) traceOutOp(op string, arg []byte) { @@ -1339,11 +1331,7 @@ func computeRTT(start time.Time) time.Duration { return rtt } -func (c *client) processConnect(arg []byte, trace bool) error { - if trace { - c.traceInOp("CONNECT", removePassFromTrace(arg)) - } - +func (c *client) processConnect(arg []byte) error { c.mu.Lock() // If we can't stop the timer because the callback is in progress... if !c.clearAuthTimer() { @@ -1663,7 +1651,9 @@ func (c *client) enqueueProto(proto []byte) { // Assume the lock is held upon entry. func (c *client) sendPong() { - c.traceOutOpIfOk("PONG", nil) + if c.trace { + c.traceOutOp("PONG", nil) + } c.enqueueProto([]byte(pongProto)) } @@ -1697,7 +1687,9 @@ func (c *client) sendRTTPingLocked() bool { func (c *client) sendPing() { c.rttStart = time.Now() c.ping.out++ - c.traceOutOpIfOk("PING", nil) + if c.trace { + c.traceOutOp("PING", nil) + } c.enqueueProto([]byte(pingProto)) } @@ -1716,14 +1708,18 @@ func (c *client) generateClientInfoJSON(info Info) []byte { func (c *client) sendErr(err string) { c.mu.Lock() - c.traceOutOpIfOk("-ERR", []byte(err)) + if c.trace { + c.traceOutOp("-ERR", []byte(err)) + } c.enqueueProto([]byte(fmt.Sprintf(errProto, err))) c.mu.Unlock() } func (c *client) sendOK() { c.mu.Lock() - c.traceOutOpIfOk("OK", nil) + if c.trace { + c.traceOutOp("OK", nil) + } c.enqueueProto([]byte(okProto)) c.pcd[c] = needFlush c.mu.Unlock() @@ -1731,9 +1727,6 @@ func (c *client) sendOK() { func (c *client) processPing() { c.mu.Lock() - if c.trace { - c.traceInOp("PING", nil) - } if c.isClosed() { c.mu.Unlock() @@ -1793,9 +1786,6 @@ func (c *client) processPing() { func (c *client) processPong() { c.mu.Lock() - if c.trace { - c.traceInOp("PONG", nil) - } c.ping.out = 0 c.rtt = computeRTT(c.rttStart) srv := c.srv @@ -1806,11 +1796,7 @@ func (c *client) processPong() { } } -func (c *client) processPub(arg []byte, trace bool) error { - if trace { - c.traceInOp("PUB", arg) - } - +func (c *client) processPub(arg []byte) error { // Unroll splitArgs to avoid runtime/heap issues a := [MAX_PUB_ARGS][]byte{} args := a[:0] @@ -1887,11 +1873,7 @@ func splitArg(arg []byte) [][]byte { return args } -func (c *client) processSub(argo []byte, noForward bool, trace bool) (*subscription, error) { - if trace { - c.traceInOp("SUB", argo) - } - +func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) { // Indicate activity. c.in.subs++ @@ -2273,10 +2255,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool } } -func (c *client) processUnsub(arg []byte, trace bool) error { - if trace { - c.traceInOp("UNSUB", arg) - } +func (c *client) processUnsub(arg []byte) error { args := splitArg(arg) var sid []byte max := -1 @@ -2730,30 +2709,26 @@ func isReservedReply(reply []byte) bool { } // This will decide to call the client code or router code. -func (c *client) processInboundMsg(msg []byte, trace bool) { +func (c *client) processInboundMsg(msg []byte) { switch c.kind { case CLIENT: - c.processInboundClientMsg(msg, trace) + c.processInboundClientMsg(msg) case ROUTER: - c.processInboundRoutedMsg(msg, trace) + c.processInboundRoutedMsg(msg) case GATEWAY: - c.processInboundGatewayMsg(msg, trace) + c.processInboundGatewayMsg(msg) case LEAF: - c.processInboundLeafMsg(msg, trace) + c.processInboundLeafMsg(msg) } } // processInboundClientMsg is called to process an inbound msg from a client. -func (c *client) processInboundClientMsg(msg []byte, trace bool) { +func (c *client) processInboundClientMsg(msg []byte) { // Update statistics // The msg includes the CR_LF, so pull back out for accounting. c.in.msgs++ c.in.bytes += int32(len(msg) - LEN_CR_LF) - if trace { - c.traceMsg(msg) - } - // Check that client (could be here with SYSTEM) is not publishing on reserved "$GNR" prefix. if c.kind == CLIENT && hasGWRoutedReplyPrefix(c.pa.subject) { c.pubPermissionViolation(c.pa.subject) diff --git a/server/events.go b/server/events.go index 972f2d64..41375439 100644 --- a/server/events.go +++ b/server/events.go @@ -268,14 +268,16 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) { trace := c.trace c.mu.Unlock() - if trace { - c.traceInOp(fmt.Sprintf( - "PUB %s %s %d", c.pa.subject, c.pa.reply, c.pa.size), nil) - } - // Add in NL b = append(b, _CRLF_...) - c.processInboundClientMsg(b, trace) + + if trace { + c.traceInOp(fmt.Sprintf("PUB %s %s %d", + c.pa.subject, c.pa.reply, c.pa.size), nil) + c.traceMsg(b) + } + + c.processInboundClientMsg(b) // See if we are doing graceful shutdown. if !pm.last { c.flushClients(0) // Never spend time in place. @@ -1024,8 +1026,13 @@ func (s *Server) systemSubscribe(subject string, internalOnly bool, cb msgHandle trace := c.trace s.mu.Unlock() + arg := []byte(subject + " " + sid) + if trace { + c.traceInOp("SUB", arg) + } + // Now create the subscription - return c.processSub([]byte(subject+" "+sid), internalOnly, trace) + return c.processSub(arg, internalOnly) } func (s *Server) sysUnsubscribe(sub *subscription) { diff --git a/server/gateway.go b/server/gateway.go index d48cd4cb..dddbc1b4 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -1722,8 +1722,8 @@ func (c *client) processGatewayAccountSub(accName string) error { // If in modeInterestOnly or for a queue sub, remove from // the sublist if present. // -func (c *client) processGatewayRUnsub(arg []byte, trace bool) error { - accName, subject, queue, err := c.parseUnsubProto(arg, trace) +func (c *client) processGatewayRUnsub(arg []byte) error { + accName, subject, queue, err := c.parseUnsubProto(arg) if err != nil { return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error()) } @@ -1813,11 +1813,7 @@ func (c *client) processGatewayRUnsub(arg []byte, trace bool) error { // For queue subs, or if in modeInterestOnly, register interest // from remote gateway. // -func (c *client) processGatewayRSub(arg []byte, trace bool) error { - if trace { - c.traceInOp("RS+", arg) - } - +func (c *client) processGatewayRSub(arg []byte) error { // Indicate activity. c.in.subs++ @@ -2727,16 +2723,12 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) { // account or subject for which there is no interest in this cluster // an A-/RS- protocol may be send back. // -func (c *client) processInboundGatewayMsg(msg []byte, trace bool) { +func (c *client) processInboundGatewayMsg(msg []byte) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += int32(len(msg) - LEN_CR_LF) - if trace { - c.traceMsg(msg) - } - if c.opts.Verbose { c.sendOK() } diff --git a/server/leafnode.go b/server/leafnode.go index 926bfeb7..f3210679 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1227,11 +1227,7 @@ func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) { } // processLeafSub will process an inbound sub request for the remote leaf node. -func (c *client) processLeafSub(argo []byte, trace bool) (err error) { - if trace { - c.traceInOp("LS+", argo) - } - +func (c *client) processLeafSub(argo []byte) (err error) { // Indicate activity. c.in.subs++ @@ -1355,11 +1351,7 @@ func (s *Server) reportLeafNodeLoop(c *client) { } // processLeafUnsub will process an inbound unsub request for the remote leaf node. -func (c *client) processLeafUnsub(arg []byte, trace bool) error { - if trace { - c.traceInOp("LS-", arg) - } - +func (c *client) processLeafUnsub(arg []byte) error { // Indicate any activity, so pub and sub or unsubs. c.in.subs++ @@ -1394,11 +1386,7 @@ func (c *client) processLeafUnsub(arg []byte, trace bool) error { return nil } -func (c *client) processLeafMsgArgs(arg []byte, trace bool) error { - if trace { - c.traceInOp("LMSG", arg) - } - +func (c *client) processLeafMsgArgs(arg []byte) error { // Unroll splitArgs to avoid runtime/heap issues a := [MAX_MSG_ARGS][]byte{} args := a[:0] @@ -1469,16 +1457,12 @@ func (c *client) processLeafMsgArgs(arg []byte, trace bool) error { } // processInboundLeafMsg is called to process an inbound msg from a leaf node. -func (c *client) processInboundLeafMsg(msg []byte, trace bool) { +func (c *client) processInboundLeafMsg(msg []byte) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += int32(len(msg) - LEN_CR_LF) - if trace { - c.traceMsg(msg) - } - // Check pub permissions if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowed(string(c.pa.subject)) { c.pubPermissionViolation(c.pa.subject) diff --git a/server/parser.go b/server/parser.go index ba1eca01..0d4712c0 100644 --- a/server/parser.go +++ b/server/parser.go @@ -221,7 +221,10 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processPub(arg, trace); err != nil { + if trace { + c.traceInOp("PUB", arg) + } + if err := c.processPub(arg); err != nil { return err } c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD @@ -278,7 +281,10 @@ func (c *client) parse(buf []byte) error { } else { c.msgBuf = buf[c.as : i+1] } - c.processInboundMsg(c.msgBuf, trace) + if trace { + c.traceMsg(c.msgBuf) + } + c.processInboundMsg(c.msgBuf) c.argBuf, c.msgBuf = nil, nil c.drop, c.as, c.state = 0, i+1, OP_START // Drop all pub args @@ -320,7 +326,10 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processAccountSub(arg, trace); err != nil { + if trace { + c.traceInOp("A+", arg) + } + if err := c.processAccountSub(arg); err != nil { return err } c.drop, c.as, c.state = 0, i+1, OP_START @@ -356,6 +365,9 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } + if trace { + c.traceInOp("A-", arg) + } c.processAccountUnsub(arg) c.drop, c.as, c.state = 0, i+1, OP_START default: @@ -408,13 +420,25 @@ func (c *client) parse(buf []byte) error { switch c.kind { case CLIENT: - _, err = c.processSub(arg, false, trace) + if trace { + c.traceInOp("SUB", arg) + } + _, err = c.processSub(arg, false) case ROUTER: - err = c.processRemoteSub(arg, trace) + if trace { + c.traceInOp("RS+", arg) + } + err = c.processRemoteSub(arg) case GATEWAY: - err = c.processGatewayRSub(arg, trace) + if trace { + c.traceInOp("RS+", arg) + } + err = c.processGatewayRSub(arg) case LEAF: - err = c.processLeafSub(arg, trace) + if trace { + c.traceInOp("LS+", arg) + } + err = c.processLeafSub(arg) } if err != nil { return err @@ -520,13 +544,25 @@ func (c *client) parse(buf []byte) error { switch c.kind { case CLIENT: - err = c.processUnsub(arg, trace) + if trace { + c.traceInOp("UNSUB", arg) + } + err = c.processUnsub(arg) case ROUTER: - err = c.processRemoteUnsub(arg, trace) + if trace && c.srv != nil { + c.traceInOp("RS-", arg) + } + err = c.processRemoteUnsub(arg) case GATEWAY: - err = c.processGatewayRUnsub(arg, trace) + if trace { + c.traceInOp("RS-", arg) + } + err = c.processGatewayRUnsub(arg) case LEAF: - err = c.processLeafUnsub(arg, trace) + if trace { + c.traceInOp("LS-", arg) + } + err = c.processLeafUnsub(arg) } if err != nil { return err @@ -554,6 +590,9 @@ func (c *client) parse(buf []byte) error { case OP_PING: switch b { case '\n': + if trace { + c.traceInOp("PING", nil) + } c.processPing() c.drop, c.state = 0, OP_START } @@ -574,6 +613,9 @@ func (c *client) parse(buf []byte) error { case OP_PONG: switch b { case '\n': + if trace { + c.traceInOp("PONG", nil) + } c.processPong() c.drop, c.state = 0, OP_START } @@ -639,7 +681,10 @@ func (c *client) parse(buf []byte) error { } else { arg = buf[c.as : i-c.drop] } - if err := c.processConnect(arg, trace); err != nil { + if trace { + c.traceInOp("CONNECT", removePassFromTrace(arg)) + } + if err := c.processConnect(arg); err != nil { return err } c.drop, c.state = 0, OP_START @@ -695,9 +740,15 @@ func (c *client) parse(buf []byte) error { } var err error if c.kind == ROUTER || c.kind == GATEWAY { - err = c.processRoutedMsgArgs(arg, trace) + if trace { + c.traceInOp("RMSG", arg) + } + err = c.processRoutedMsgArgs(arg) } else if c.kind == LEAF { - err = c.processLeafMsgArgs(arg, trace) + if trace { + c.traceInOp("LMSG", arg) + } + err = c.processLeafMsgArgs(arg) } if err != nil { return err @@ -929,10 +980,19 @@ func (c *client) clonePubArg(trace bool) error { switch c.kind { case ROUTER, GATEWAY: - return c.processRoutedMsgArgs(c.argBuf, trace) + if trace { + c.traceInOp("RMSG", c.argBuf) + } + return c.processRoutedMsgArgs(c.argBuf) case LEAF: - return c.processLeafMsgArgs(c.argBuf, trace) + if trace { + c.traceInOp("LMSG", c.argBuf) + } + return c.processLeafMsgArgs(c.argBuf) default: - return c.processPub(c.argBuf, trace) + if trace { + c.traceInOp("PUB", c.argBuf) + } + return c.processPub(c.argBuf) } } diff --git a/server/parser_test.go b/server/parser_test.go index 1e267157..c2834cfc 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -301,7 +301,7 @@ func TestParsePubArg(t *testing.T) { subject: "foo", reply: "", size: 2222, szb: "2222"}, } { t.Run(test.arg, func(t *testing.T) { - if err := c.processPub([]byte(test.arg), false); err != nil { + if err := c.processPub([]byte(test.arg)); err != nil { t.Fatalf("Unexpected parse error: %v\n", err) } if !bytes.Equal(c.pa.subject, []byte(test.subject)) { @@ -324,7 +324,7 @@ func TestParsePubBadSize(t *testing.T) { c := dummyClient() // Setup localized max payload c.mpay = 32768 - if err := c.processPub([]byte("foo 2222222222222222"), false); err == nil { + if err := c.processPub([]byte("foo 2222222222222222")); err == nil { t.Fatalf("Expected parse error for size too large") } } diff --git a/server/route.go b/server/route.go index fdf73764..84a111d5 100644 --- a/server/route.go +++ b/server/route.go @@ -139,10 +139,7 @@ func (c *client) removeReplySubTimeout(sub *subscription) { } } -func (c *client) processAccountSub(arg []byte, trace bool) error { - if trace { - c.traceInOp("A+", arg) - } +func (c *client) processAccountSub(arg []byte) error { accName := string(arg) if c.kind == GATEWAY { return c.processGatewayAccountSub(accName) @@ -151,7 +148,6 @@ func (c *client) processAccountSub(arg []byte, trace bool) error { } func (c *client) processAccountUnsub(arg []byte) { - c.traceInOp("A-", arg) accName := string(arg) if c.kind == GATEWAY { c.processGatewayAccountUnsub(accName) @@ -159,10 +155,7 @@ func (c *client) processAccountUnsub(arg []byte) { } // Process an inbound RMSG specification from the remote route. -func (c *client) processRoutedMsgArgs(arg []byte, trace bool) error { - if trace { - c.traceInOp("RMSG", arg) - } +func (c *client) processRoutedMsgArgs(arg []byte) error { // Unroll splitArgs to avoid runtime/heap issues a := [MAX_MSG_ARGS][]byte{} args := a[:0] @@ -234,16 +227,12 @@ func (c *client) processRoutedMsgArgs(arg []byte, trace bool) error { } // processInboundRouteMsg is called to process an inbound msg from a route. -func (c *client) processInboundRoutedMsg(msg []byte, trace bool) { +func (c *client) processInboundRoutedMsg(msg []byte) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. c.in.bytes += int32(len(msg) - LEN_CR_LF) - if trace { - c.traceMsg(msg) - } - if c.opts.Verbose { c.sendOK() } @@ -668,11 +657,7 @@ func (c *client) removeRemoteSubs() { } } -func (c *client) parseUnsubProto(arg []byte, trace bool) (string, []byte, []byte, error) { - if trace { - c.traceInOp("RS-", arg) - } - +func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) { // Indicate any activity, so pub and sub or unsubs. c.in.subs++ @@ -690,12 +675,12 @@ func (c *client) parseUnsubProto(arg []byte, trace bool) (string, []byte, []byte } // Indicates no more interest in the given account/subject for the remote side. -func (c *client) processRemoteUnsub(arg []byte, trace bool) (err error) { +func (c *client) processRemoteUnsub(arg []byte) (err error) { srv := c.srv if srv == nil { return nil } - accountName, subject, _, err := c.parseUnsubProto(arg, trace) + accountName, subject, _, err := c.parseUnsubProto(arg) if err != nil { return fmt.Errorf("processRemoteUnsub %s", err.Error()) } @@ -740,11 +725,7 @@ func (c *client) processRemoteUnsub(arg []byte, trace bool) (err error) { return nil } -func (c *client) processRemoteSub(argo []byte, trace bool) (err error) { - if trace { - c.traceInOp("RS+", argo) - } - +func (c *client) processRemoteSub(argo []byte) (err error) { // Indicate activity. c.in.subs++