Changes to trace logging output, bug fixes

This commit is contained in:
Derek Collison
2015-01-09 18:47:16 -08:00
parent eaf20c2667
commit 651d169dfd

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2014 Apcera Inc. All rights reserved.
// Copyright 2012-2015 Apcera Inc. All rights reserved.
package server
@@ -165,7 +165,7 @@ func (c *client) readLoop() {
err := cp.bw.Flush()
cp.nc.SetWriteDeadline(time.Time{})
if err != nil {
Debugf("Error flushing: %v", err)
c.Debugf("Error flushing: %v", err)
cp.mu.Unlock()
cp.closeConnection()
cp.mu.Lock()
@@ -185,22 +185,28 @@ func (c *client) traceMsg(msg []byte) {
if trace == 0 {
return
}
pm := fmt.Sprintf("Processing %s msg: %d", c.typeString(), c.inMsgs)
opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg[:len(msg)-LEN_CR_LF])}
c.Tracef("MSG: %s", opa)
// FIXME(dlc), allow limits to printable payload
c.Tracef("->> MSG_PAYLOAD: [%s]", string(msg[:len(msg)-LEN_CR_LF]))
}
func (c *client) traceOp(op string, arg []byte) {
func (c *client) traceInOp(op string, arg []byte) {
c.traceOp("->> %s", op, arg)
}
func (c *client) traceOutOp(op string, arg []byte) {
c.traceOp("<<- %s", op, arg)
}
func (c *client) traceOp(format, op string, arg []byte) {
if trace == 0 {
return
}
opa := []interface{}{fmt.Sprintf("%s OP", op)}
opa := []interface{}{op}
if arg != nil {
opa = append(opa, fmt.Sprintf("%s %s", op, string(arg)))
opa = append(opa, string(arg))
}
c.Tracef("OP: %s", opa)
c.Tracef(format, opa)
}
// Process the info message if we are a route.
@@ -245,7 +251,7 @@ func (c *client) processErr(errStr string) {
}
func (c *client) processConnect(arg []byte) error {
c.traceOp("CONNECT", arg)
c.traceInOp("CONNECT", arg)
// This will be resolved regardless before we exit this func,
// so we can just clear it here.
@@ -301,10 +307,11 @@ func (c *client) sendOK() {
}
func (c *client) processPing() {
c.traceOp("PING", nil)
c.traceInOp("PING", nil)
if c.nc == nil {
return
}
c.traceOutOp("PONG", nil)
c.mu.Lock()
c.bw.WriteString("PONG\r\n")
err := c.bw.Flush()
@@ -316,15 +323,15 @@ func (c *client) processPing() {
}
func (c *client) processPong() {
c.traceOp("PONG", nil)
c.traceInOp("PONG", nil)
c.mu.Lock()
c.pout -= 1
c.mu.Unlock()
}
func (c *client) processMsgArgs(arg []byte) error {
if trace == 0 {
c.traceOp("MSG", arg)
if trace == 1 {
c.traceInOp("MSG", arg)
}
// Unroll splitArgs to avoid runtime/heap issues
@@ -370,8 +377,8 @@ func (c *client) processMsgArgs(arg []byte) error {
}
func (c *client) processPub(arg []byte) error {
if trace == 0 {
c.traceOp("PUB", arg)
if trace == 1 {
c.traceInOp("PUB", arg)
}
// Unroll splitArgs to avoid runtime/heap issues
@@ -442,7 +449,7 @@ func splitArg(arg []byte) [][]byte {
}
func (c *client) processSub(argo []byte) (err error) {
c.traceOp("SUB", argo)
c.traceInOp("SUB", argo)
// Copy so we do not reference a potentially large buffer
arg := make([]byte, len(argo))
copy(arg, argo)
@@ -492,7 +499,7 @@ func (c *client) unsubscribe(sub *subscription) {
string(sub.subject), sub.max, sub.nm)
return
}
c.traceOp("DELSUB", sub.sid)
c.traceOp("<-> %s", "DELSUB", sub.sid)
c.subs.Remove(sub.sid)
if c.srv != nil {
c.srv.sl.Remove(sub.subject, sub)
@@ -500,7 +507,7 @@ func (c *client) unsubscribe(sub *subscription) {
}
func (c *client) processUnsub(arg []byte) error {
c.traceOp("UNSUB", arg)
c.traceInOp("UNSUB", arg)
args := splitArg(arg)
var sid []byte
max := -1
@@ -564,11 +571,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
// still process the message in hand, otherwise
// unsubscribe and drop message on the floor.
if sub.nm == sub.max {
c.Debugf("Auto-unsubscribe limit of %d reached for sid:%s\n", sub.max, string(sub.sid))
defer client.unsubscribe(sub)
if shouldForward {
defer client.srv.broadcastUnSubscribe(sub)
}
} else if sub.nm > sub.max {
c.Debugf("Auto-unsubscribe limit [%d] exceeded\n", sub.max)
client.mu.Unlock()
client.unsubscribe(sub)
if shouldForward {
@@ -615,6 +624,10 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
goto writeErr
}
if trace == 1 {
c.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil)
}
// TODO(dlc) - Do we need this or can we just call always?
if deadlineSet {
client.nc.SetWriteDeadline(time.Time{})
@@ -657,7 +670,7 @@ func (c *client) processMsg(msg []byte) {
atomic.AddInt64(&srv.inBytes, msgSize)
}
if trace > 0 {
if trace == 1 {
c.traceMsg(msg)
}
if srv == nil {
@@ -775,7 +788,7 @@ func (c *client) processPingTimer() {
return
}
c.Debugf("Client Ping Timer")
c.Debugf("%s Ping Timer", c.typeString())
// Check for violation
c.pout += 1
@@ -789,11 +802,13 @@ func (c *client) processPingTimer() {
return
}
c.traceOutOp("PING", nil)
// Send PING
c.bw.WriteString("PING\r\n")
err := c.bw.Flush()
if err != nil {
Debugf("Error on Client Ping Flush, error %s", err)
c.Debugf("Error on Client Ping Flush, error %s", err)
c.clearConnection()
} else {
// Reset to fire again if all OK.
@@ -902,15 +917,17 @@ func (c *client) closeConnection() {
defer srv.mu.Unlock()
rid := c.route.remoteID
if rid != "" && srv.remotes[rid] != nil {
c.Debugf("Not attempting reconnect for solicited route, already connected. Try %d", rid)
Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid)
return
} else {
c.Debugf("Attempting reconnect for solicited route")
Debugf("Attempting reconnect for solicited route \"%s\"", c.route.url)
go srv.reConnectToRoute(c.route.url)
}
}
}
// Logging functionality scoped to a client or route.
func (c *client) Errorf(format string, v ...interface{}) {
format = fmt.Sprintf("%s - %s", c, format)
Errorf(format, v...)