From 651d169dfdd68f56ea22445b5d006e5e6d629653 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 9 Jan 2015 18:47:16 -0800 Subject: [PATCH] Changes to trace logging output, bug fixes --- server/client.go | 67 ++++++++++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 25 deletions(-) diff --git a/server/client.go b/server/client.go index c6ab6848..9d42d94b 100644 --- a/server/client.go +++ b/server/client.go @@ -1,4 +1,4 @@ -// Copyright 2012-2014 Apcera Inc. All rights reserved. +// Copyright 2012-2015 Apcera Inc. All rights reserved. package server @@ -165,7 +165,7 @@ func (c *client) readLoop() { err := cp.bw.Flush() cp.nc.SetWriteDeadline(time.Time{}) if err != nil { - Debugf("Error flushing: %v", err) + c.Debugf("Error flushing: %v", err) cp.mu.Unlock() cp.closeConnection() cp.mu.Lock() @@ -185,22 +185,28 @@ func (c *client) traceMsg(msg []byte) { if trace == 0 { return } - - pm := fmt.Sprintf("Processing %s msg: %d", c.typeString(), c.inMsgs) - opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg[:len(msg)-LEN_CR_LF])} - c.Tracef("MSG: %s", opa) + // FIXME(dlc), allow limits to printable payload + c.Tracef("->> MSG_PAYLOAD: [%s]", string(msg[:len(msg)-LEN_CR_LF])) } -func (c *client) traceOp(op string, arg []byte) { +func (c *client) traceInOp(op string, arg []byte) { + c.traceOp("->> %s", op, arg) +} + +func (c *client) traceOutOp(op string, arg []byte) { + c.traceOp("<<- %s", op, arg) +} + +func (c *client) traceOp(format, op string, arg []byte) { if trace == 0 { return } - opa := []interface{}{fmt.Sprintf("%s OP", op)} + opa := []interface{}{op} if arg != nil { - opa = append(opa, fmt.Sprintf("%s %s", op, string(arg))) + opa = append(opa, string(arg)) } - c.Tracef("OP: %s", opa) + c.Tracef(format, opa) } // Process the info message if we are a route. @@ -245,7 +251,7 @@ func (c *client) processErr(errStr string) { } func (c *client) processConnect(arg []byte) error { - c.traceOp("CONNECT", arg) + c.traceInOp("CONNECT", arg) // This will be resolved regardless before we exit this func, // so we can just clear it here. @@ -301,10 +307,11 @@ func (c *client) sendOK() { } func (c *client) processPing() { - c.traceOp("PING", nil) + c.traceInOp("PING", nil) if c.nc == nil { return } + c.traceOutOp("PONG", nil) c.mu.Lock() c.bw.WriteString("PONG\r\n") err := c.bw.Flush() @@ -316,15 +323,15 @@ func (c *client) processPing() { } func (c *client) processPong() { - c.traceOp("PONG", nil) + c.traceInOp("PONG", nil) c.mu.Lock() c.pout -= 1 c.mu.Unlock() } func (c *client) processMsgArgs(arg []byte) error { - if trace == 0 { - c.traceOp("MSG", arg) + if trace == 1 { + c.traceInOp("MSG", arg) } // Unroll splitArgs to avoid runtime/heap issues @@ -370,8 +377,8 @@ func (c *client) processMsgArgs(arg []byte) error { } func (c *client) processPub(arg []byte) error { - if trace == 0 { - c.traceOp("PUB", arg) + if trace == 1 { + c.traceInOp("PUB", arg) } // Unroll splitArgs to avoid runtime/heap issues @@ -442,7 +449,7 @@ func splitArg(arg []byte) [][]byte { } func (c *client) processSub(argo []byte) (err error) { - c.traceOp("SUB", argo) + c.traceInOp("SUB", argo) // Copy so we do not reference a potentially large buffer arg := make([]byte, len(argo)) copy(arg, argo) @@ -492,7 +499,7 @@ func (c *client) unsubscribe(sub *subscription) { string(sub.subject), sub.max, sub.nm) return } - c.traceOp("DELSUB", sub.sid) + c.traceOp("<-> %s", "DELSUB", sub.sid) c.subs.Remove(sub.sid) if c.srv != nil { c.srv.sl.Remove(sub.subject, sub) @@ -500,7 +507,7 @@ func (c *client) unsubscribe(sub *subscription) { } func (c *client) processUnsub(arg []byte) error { - c.traceOp("UNSUB", arg) + c.traceInOp("UNSUB", arg) args := splitArg(arg) var sid []byte max := -1 @@ -564,11 +571,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { // still process the message in hand, otherwise // unsubscribe and drop message on the floor. if sub.nm == sub.max { + c.Debugf("Auto-unsubscribe limit of %d reached for sid:%s\n", sub.max, string(sub.sid)) defer client.unsubscribe(sub) if shouldForward { defer client.srv.broadcastUnSubscribe(sub) } } else if sub.nm > sub.max { + c.Debugf("Auto-unsubscribe limit [%d] exceeded\n", sub.max) client.mu.Unlock() client.unsubscribe(sub) if shouldForward { @@ -615,6 +624,10 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { goto writeErr } + if trace == 1 { + c.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil) + } + // TODO(dlc) - Do we need this or can we just call always? if deadlineSet { client.nc.SetWriteDeadline(time.Time{}) @@ -657,7 +670,7 @@ func (c *client) processMsg(msg []byte) { atomic.AddInt64(&srv.inBytes, msgSize) } - if trace > 0 { + if trace == 1 { c.traceMsg(msg) } if srv == nil { @@ -775,7 +788,7 @@ func (c *client) processPingTimer() { return } - c.Debugf("Client Ping Timer") + c.Debugf("%s Ping Timer", c.typeString()) // Check for violation c.pout += 1 @@ -789,11 +802,13 @@ func (c *client) processPingTimer() { return } + c.traceOutOp("PING", nil) + // Send PING c.bw.WriteString("PING\r\n") err := c.bw.Flush() if err != nil { - Debugf("Error on Client Ping Flush, error %s", err) + c.Debugf("Error on Client Ping Flush, error %s", err) c.clearConnection() } else { // Reset to fire again if all OK. @@ -902,15 +917,17 @@ func (c *client) closeConnection() { defer srv.mu.Unlock() rid := c.route.remoteID if rid != "" && srv.remotes[rid] != nil { - c.Debugf("Not attempting reconnect for solicited route, already connected. Try %d", rid) + Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid) return } else { - c.Debugf("Attempting reconnect for solicited route") + Debugf("Attempting reconnect for solicited route \"%s\"", c.route.url) go srv.reConnectToRoute(c.route.url) } } } +// Logging functionality scoped to a client or route. + func (c *client) Errorf(format string, v ...interface{}) { format = fmt.Sprintf("%s - %s", c, format) Errorf(format, v...)