Moving inbound tracing to the caller (client.parse)

Tracing for outgoing operations is always done while
holding the client lock.

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2020-03-04 17:31:18 -05:00
parent fe373ac597
commit 6a1c3fc29b
7 changed files with 130 additions and 131 deletions

View File

@@ -1239,14 +1239,6 @@ func (c *client) traceInOp(op string, arg []byte) {
c.traceOp("<<- %s", op, arg)
}
// Traces an outgoing operation.
// Will check if tracing is enabled, DOES need the client lock.
func (c *client) traceOutOpIfOk(op string, arg []byte) {
if c.trace {
c.traceOutOp(op, arg)
}
}
// Traces an outgoing operation.
// Will NOT check if tracing is enabled, does NOT need the client lock.
func (c *client) traceOutOp(op string, arg []byte) {
@@ -1339,11 +1331,7 @@ func computeRTT(start time.Time) time.Duration {
return rtt
}
func (c *client) processConnect(arg []byte, trace bool) error {
if trace {
c.traceInOp("CONNECT", removePassFromTrace(arg))
}
func (c *client) processConnect(arg []byte) error {
c.mu.Lock()
// If we can't stop the timer because the callback is in progress...
if !c.clearAuthTimer() {
@@ -1663,7 +1651,9 @@ func (c *client) enqueueProto(proto []byte) {
// Assume the lock is held upon entry.
func (c *client) sendPong() {
c.traceOutOpIfOk("PONG", nil)
if c.trace {
c.traceOutOp("PONG", nil)
}
c.enqueueProto([]byte(pongProto))
}
@@ -1697,7 +1687,9 @@ func (c *client) sendRTTPingLocked() bool {
func (c *client) sendPing() {
c.rttStart = time.Now()
c.ping.out++
c.traceOutOpIfOk("PING", nil)
if c.trace {
c.traceOutOp("PING", nil)
}
c.enqueueProto([]byte(pingProto))
}
@@ -1716,14 +1708,18 @@ func (c *client) generateClientInfoJSON(info Info) []byte {
func (c *client) sendErr(err string) {
c.mu.Lock()
c.traceOutOpIfOk("-ERR", []byte(err))
if c.trace {
c.traceOutOp("-ERR", []byte(err))
}
c.enqueueProto([]byte(fmt.Sprintf(errProto, err)))
c.mu.Unlock()
}
func (c *client) sendOK() {
c.mu.Lock()
c.traceOutOpIfOk("OK", nil)
if c.trace {
c.traceOutOp("OK", nil)
}
c.enqueueProto([]byte(okProto))
c.pcd[c] = needFlush
c.mu.Unlock()
@@ -1731,9 +1727,6 @@ func (c *client) sendOK() {
func (c *client) processPing() {
c.mu.Lock()
if c.trace {
c.traceInOp("PING", nil)
}
if c.isClosed() {
c.mu.Unlock()
@@ -1793,9 +1786,6 @@ func (c *client) processPing() {
func (c *client) processPong() {
c.mu.Lock()
if c.trace {
c.traceInOp("PONG", nil)
}
c.ping.out = 0
c.rtt = computeRTT(c.rttStart)
srv := c.srv
@@ -1806,11 +1796,7 @@ func (c *client) processPong() {
}
}
func (c *client) processPub(arg []byte, trace bool) error {
if trace {
c.traceInOp("PUB", arg)
}
func (c *client) processPub(arg []byte) error {
// Unroll splitArgs to avoid runtime/heap issues
a := [MAX_PUB_ARGS][]byte{}
args := a[:0]
@@ -1887,11 +1873,7 @@ func splitArg(arg []byte) [][]byte {
return args
}
func (c *client) processSub(argo []byte, noForward bool, trace bool) (*subscription, error) {
if trace {
c.traceInOp("SUB", argo)
}
func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) {
// Indicate activity.
c.in.subs++
@@ -2273,10 +2255,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
}
}
func (c *client) processUnsub(arg []byte, trace bool) error {
if trace {
c.traceInOp("UNSUB", arg)
}
func (c *client) processUnsub(arg []byte) error {
args := splitArg(arg)
var sid []byte
max := -1
@@ -2730,30 +2709,26 @@ func isReservedReply(reply []byte) bool {
}
// This will decide to call the client code or router code.
func (c *client) processInboundMsg(msg []byte, trace bool) {
func (c *client) processInboundMsg(msg []byte) {
switch c.kind {
case CLIENT:
c.processInboundClientMsg(msg, trace)
c.processInboundClientMsg(msg)
case ROUTER:
c.processInboundRoutedMsg(msg, trace)
c.processInboundRoutedMsg(msg)
case GATEWAY:
c.processInboundGatewayMsg(msg, trace)
c.processInboundGatewayMsg(msg)
case LEAF:
c.processInboundLeafMsg(msg, trace)
c.processInboundLeafMsg(msg)
}
}
// processInboundClientMsg is called to process an inbound msg from a client.
func (c *client) processInboundClientMsg(msg []byte, trace bool) {
func (c *client) processInboundClientMsg(msg []byte) {
// Update statistics
// The msg includes the CR_LF, so pull back out for accounting.
c.in.msgs++
c.in.bytes += int32(len(msg) - LEN_CR_LF)
if trace {
c.traceMsg(msg)
}
// Check that client (could be here with SYSTEM) is not publishing on reserved "$GNR" prefix.
if c.kind == CLIENT && hasGWRoutedReplyPrefix(c.pa.subject) {
c.pubPermissionViolation(c.pa.subject)

View File

@@ -268,14 +268,16 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
trace := c.trace
c.mu.Unlock()
if trace {
c.traceInOp(fmt.Sprintf(
"PUB %s %s %d", c.pa.subject, c.pa.reply, c.pa.size), nil)
}
// Add in NL
b = append(b, _CRLF_...)
c.processInboundClientMsg(b, trace)
if trace {
c.traceInOp(fmt.Sprintf("PUB %s %s %d",
c.pa.subject, c.pa.reply, c.pa.size), nil)
c.traceMsg(b)
}
c.processInboundClientMsg(b)
// See if we are doing graceful shutdown.
if !pm.last {
c.flushClients(0) // Never spend time in place.
@@ -1024,8 +1026,13 @@ func (s *Server) systemSubscribe(subject string, internalOnly bool, cb msgHandle
trace := c.trace
s.mu.Unlock()
arg := []byte(subject + " " + sid)
if trace {
c.traceInOp("SUB", arg)
}
// Now create the subscription
return c.processSub([]byte(subject+" "+sid), internalOnly, trace)
return c.processSub(arg, internalOnly)
}
func (s *Server) sysUnsubscribe(sub *subscription) {

View File

@@ -1722,8 +1722,8 @@ func (c *client) processGatewayAccountSub(accName string) error {
// If in modeInterestOnly or for a queue sub, remove from
// the sublist if present.
// <Invoked from outbound connection's readLoop>
func (c *client) processGatewayRUnsub(arg []byte, trace bool) error {
accName, subject, queue, err := c.parseUnsubProto(arg, trace)
func (c *client) processGatewayRUnsub(arg []byte) error {
accName, subject, queue, err := c.parseUnsubProto(arg)
if err != nil {
return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error())
}
@@ -1813,11 +1813,7 @@ func (c *client) processGatewayRUnsub(arg []byte, trace bool) error {
// For queue subs, or if in modeInterestOnly, register interest
// from remote gateway.
// <Invoked from outbound connection's readLoop>
func (c *client) processGatewayRSub(arg []byte, trace bool) error {
if trace {
c.traceInOp("RS+", arg)
}
func (c *client) processGatewayRSub(arg []byte) error {
// Indicate activity.
c.in.subs++
@@ -2727,16 +2723,12 @@ func (c *client) handleGatewayReply(msg []byte) (processed bool) {
// account or subject for which there is no interest in this cluster
// an A-/RS- protocol may be send back.
// <Invoked from inbound connection's readLoop>
func (c *client) processInboundGatewayMsg(msg []byte, trace bool) {
func (c *client) processInboundGatewayMsg(msg []byte) {
// Update statistics
c.in.msgs++
// The msg includes the CR_LF, so pull back out for accounting.
c.in.bytes += int32(len(msg) - LEN_CR_LF)
if trace {
c.traceMsg(msg)
}
if c.opts.Verbose {
c.sendOK()
}

View File

@@ -1227,11 +1227,7 @@ func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
}
// processLeafSub will process an inbound sub request for the remote leaf node.
func (c *client) processLeafSub(argo []byte, trace bool) (err error) {
if trace {
c.traceInOp("LS+", argo)
}
func (c *client) processLeafSub(argo []byte) (err error) {
// Indicate activity.
c.in.subs++
@@ -1355,11 +1351,7 @@ func (s *Server) reportLeafNodeLoop(c *client) {
}
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
func (c *client) processLeafUnsub(arg []byte, trace bool) error {
if trace {
c.traceInOp("LS-", arg)
}
func (c *client) processLeafUnsub(arg []byte) error {
// Indicate any activity, so pub and sub or unsubs.
c.in.subs++
@@ -1394,11 +1386,7 @@ func (c *client) processLeafUnsub(arg []byte, trace bool) error {
return nil
}
func (c *client) processLeafMsgArgs(arg []byte, trace bool) error {
if trace {
c.traceInOp("LMSG", arg)
}
func (c *client) processLeafMsgArgs(arg []byte) error {
// Unroll splitArgs to avoid runtime/heap issues
a := [MAX_MSG_ARGS][]byte{}
args := a[:0]
@@ -1469,16 +1457,12 @@ func (c *client) processLeafMsgArgs(arg []byte, trace bool) error {
}
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
func (c *client) processInboundLeafMsg(msg []byte, trace bool) {
func (c *client) processInboundLeafMsg(msg []byte) {
// Update statistics
c.in.msgs++
// The msg includes the CR_LF, so pull back out for accounting.
c.in.bytes += int32(len(msg) - LEN_CR_LF)
if trace {
c.traceMsg(msg)
}
// Check pub permissions
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowed(string(c.pa.subject)) {
c.pubPermissionViolation(c.pa.subject)

View File

@@ -221,7 +221,10 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processPub(arg, trace); err != nil {
if trace {
c.traceInOp("PUB", arg)
}
if err := c.processPub(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
@@ -278,7 +281,10 @@ func (c *client) parse(buf []byte) error {
} else {
c.msgBuf = buf[c.as : i+1]
}
c.processInboundMsg(c.msgBuf, trace)
if trace {
c.traceMsg(c.msgBuf)
}
c.processInboundMsg(c.msgBuf)
c.argBuf, c.msgBuf = nil, nil
c.drop, c.as, c.state = 0, i+1, OP_START
// Drop all pub args
@@ -320,7 +326,10 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processAccountSub(arg, trace); err != nil {
if trace {
c.traceInOp("A+", arg)
}
if err := c.processAccountSub(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, OP_START
@@ -356,6 +365,9 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if trace {
c.traceInOp("A-", arg)
}
c.processAccountUnsub(arg)
c.drop, c.as, c.state = 0, i+1, OP_START
default:
@@ -408,13 +420,25 @@ func (c *client) parse(buf []byte) error {
switch c.kind {
case CLIENT:
_, err = c.processSub(arg, false, trace)
if trace {
c.traceInOp("SUB", arg)
}
_, err = c.processSub(arg, false)
case ROUTER:
err = c.processRemoteSub(arg, trace)
if trace {
c.traceInOp("RS+", arg)
}
err = c.processRemoteSub(arg)
case GATEWAY:
err = c.processGatewayRSub(arg, trace)
if trace {
c.traceInOp("RS+", arg)
}
err = c.processGatewayRSub(arg)
case LEAF:
err = c.processLeafSub(arg, trace)
if trace {
c.traceInOp("LS+", arg)
}
err = c.processLeafSub(arg)
}
if err != nil {
return err
@@ -520,13 +544,25 @@ func (c *client) parse(buf []byte) error {
switch c.kind {
case CLIENT:
err = c.processUnsub(arg, trace)
if trace {
c.traceInOp("UNSUB", arg)
}
err = c.processUnsub(arg)
case ROUTER:
err = c.processRemoteUnsub(arg, trace)
if trace && c.srv != nil {
c.traceInOp("RS-", arg)
}
err = c.processRemoteUnsub(arg)
case GATEWAY:
err = c.processGatewayRUnsub(arg, trace)
if trace {
c.traceInOp("RS-", arg)
}
err = c.processGatewayRUnsub(arg)
case LEAF:
err = c.processLeafUnsub(arg, trace)
if trace {
c.traceInOp("LS-", arg)
}
err = c.processLeafUnsub(arg)
}
if err != nil {
return err
@@ -554,6 +590,9 @@ func (c *client) parse(buf []byte) error {
case OP_PING:
switch b {
case '\n':
if trace {
c.traceInOp("PING", nil)
}
c.processPing()
c.drop, c.state = 0, OP_START
}
@@ -574,6 +613,9 @@ func (c *client) parse(buf []byte) error {
case OP_PONG:
switch b {
case '\n':
if trace {
c.traceInOp("PONG", nil)
}
c.processPong()
c.drop, c.state = 0, OP_START
}
@@ -639,7 +681,10 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processConnect(arg, trace); err != nil {
if trace {
c.traceInOp("CONNECT", removePassFromTrace(arg))
}
if err := c.processConnect(arg); err != nil {
return err
}
c.drop, c.state = 0, OP_START
@@ -695,9 +740,15 @@ func (c *client) parse(buf []byte) error {
}
var err error
if c.kind == ROUTER || c.kind == GATEWAY {
err = c.processRoutedMsgArgs(arg, trace)
if trace {
c.traceInOp("RMSG", arg)
}
err = c.processRoutedMsgArgs(arg)
} else if c.kind == LEAF {
err = c.processLeafMsgArgs(arg, trace)
if trace {
c.traceInOp("LMSG", arg)
}
err = c.processLeafMsgArgs(arg)
}
if err != nil {
return err
@@ -929,10 +980,19 @@ func (c *client) clonePubArg(trace bool) error {
switch c.kind {
case ROUTER, GATEWAY:
return c.processRoutedMsgArgs(c.argBuf, trace)
if trace {
c.traceInOp("RMSG", c.argBuf)
}
return c.processRoutedMsgArgs(c.argBuf)
case LEAF:
return c.processLeafMsgArgs(c.argBuf, trace)
if trace {
c.traceInOp("LMSG", c.argBuf)
}
return c.processLeafMsgArgs(c.argBuf)
default:
return c.processPub(c.argBuf, trace)
if trace {
c.traceInOp("PUB", c.argBuf)
}
return c.processPub(c.argBuf)
}
}

View File

@@ -301,7 +301,7 @@ func TestParsePubArg(t *testing.T) {
subject: "foo", reply: "", size: 2222, szb: "2222"},
} {
t.Run(test.arg, func(t *testing.T) {
if err := c.processPub([]byte(test.arg), false); err != nil {
if err := c.processPub([]byte(test.arg)); err != nil {
t.Fatalf("Unexpected parse error: %v\n", err)
}
if !bytes.Equal(c.pa.subject, []byte(test.subject)) {
@@ -324,7 +324,7 @@ func TestParsePubBadSize(t *testing.T) {
c := dummyClient()
// Setup localized max payload
c.mpay = 32768
if err := c.processPub([]byte("foo 2222222222222222"), false); err == nil {
if err := c.processPub([]byte("foo 2222222222222222")); err == nil {
t.Fatalf("Expected parse error for size too large")
}
}

View File

@@ -139,10 +139,7 @@ func (c *client) removeReplySubTimeout(sub *subscription) {
}
}
func (c *client) processAccountSub(arg []byte, trace bool) error {
if trace {
c.traceInOp("A+", arg)
}
func (c *client) processAccountSub(arg []byte) error {
accName := string(arg)
if c.kind == GATEWAY {
return c.processGatewayAccountSub(accName)
@@ -151,7 +148,6 @@ func (c *client) processAccountSub(arg []byte, trace bool) error {
}
func (c *client) processAccountUnsub(arg []byte) {
c.traceInOp("A-", arg)
accName := string(arg)
if c.kind == GATEWAY {
c.processGatewayAccountUnsub(accName)
@@ -159,10 +155,7 @@ func (c *client) processAccountUnsub(arg []byte) {
}
// Process an inbound RMSG specification from the remote route.
func (c *client) processRoutedMsgArgs(arg []byte, trace bool) error {
if trace {
c.traceInOp("RMSG", arg)
}
func (c *client) processRoutedMsgArgs(arg []byte) error {
// Unroll splitArgs to avoid runtime/heap issues
a := [MAX_MSG_ARGS][]byte{}
args := a[:0]
@@ -234,16 +227,12 @@ func (c *client) processRoutedMsgArgs(arg []byte, trace bool) error {
}
// processInboundRouteMsg is called to process an inbound msg from a route.
func (c *client) processInboundRoutedMsg(msg []byte, trace bool) {
func (c *client) processInboundRoutedMsg(msg []byte) {
// Update statistics
c.in.msgs++
// The msg includes the CR_LF, so pull back out for accounting.
c.in.bytes += int32(len(msg) - LEN_CR_LF)
if trace {
c.traceMsg(msg)
}
if c.opts.Verbose {
c.sendOK()
}
@@ -668,11 +657,7 @@ func (c *client) removeRemoteSubs() {
}
}
func (c *client) parseUnsubProto(arg []byte, trace bool) (string, []byte, []byte, error) {
if trace {
c.traceInOp("RS-", arg)
}
func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) {
// Indicate any activity, so pub and sub or unsubs.
c.in.subs++
@@ -690,12 +675,12 @@ func (c *client) parseUnsubProto(arg []byte, trace bool) (string, []byte, []byte
}
// Indicates no more interest in the given account/subject for the remote side.
func (c *client) processRemoteUnsub(arg []byte, trace bool) (err error) {
func (c *client) processRemoteUnsub(arg []byte) (err error) {
srv := c.srv
if srv == nil {
return nil
}
accountName, subject, _, err := c.parseUnsubProto(arg, trace)
accountName, subject, _, err := c.parseUnsubProto(arg)
if err != nil {
return fmt.Errorf("processRemoteUnsub %s", err.Error())
}
@@ -740,11 +725,7 @@ func (c *client) processRemoteUnsub(arg []byte, trace bool) (err error) {
return nil
}
func (c *client) processRemoteSub(argo []byte, trace bool) (err error) {
if trace {
c.traceInOp("RS+", argo)
}
func (c *client) processRemoteSub(argo []byte) (err error) {
// Indicate activity.
c.in.subs++