Added base logging/tracing

This commit is contained in:
Derek Collison
2012-11-13 11:26:53 -08:00
parent d71c2414cf
commit c3eba76367
5 changed files with 216 additions and 54 deletions

View File

@@ -10,25 +10,38 @@ import (
"github.com/apcera/gnatsd/server"
)
var port = server.DEFAULT_PORT
var host = server.DEFAULT_HOST
func main() {
// Parse flags
// logging setup
server.LogSetup()
flag.IntVar(&port, "port", server.DEFAULT_PORT, "Port to listen on.")
flag.IntVar(&port, "p", server.DEFAULT_PORT, "Port to listen on.")
flag.StringVar(&host, "host", server.DEFAULT_HOST, "Network host to listen on.")
flag.StringVar(&host, "h", server.DEFAULT_HOST, "Network host to listen on.")
opts := server.Options{}
var debugAndTrace bool
// Parse flags
flag.IntVar(&opts.Port, "port", server.DEFAULT_PORT, "Port to listen on.")
flag.IntVar(&opts.Port, "p", server.DEFAULT_PORT, "Port to listen on.")
flag.StringVar(&opts.Host, "host", server.DEFAULT_HOST, "Network host to listen on.")
flag.StringVar(&opts.Host, "h", server.DEFAULT_HOST, "Network host to listen on.")
flag.BoolVar(&opts.Debug, "D", false, "Enable Debug logging.")
flag.BoolVar(&opts.Debug, "debug", false, "Enable Debug logging.")
flag.BoolVar(&opts.Trace, "V", false, "Enable Trace logging.")
flag.BoolVar(&opts.Trace, "trace", false, "Enable Trace logging.")
flag.BoolVar(&debugAndTrace, "DV", false, "Enable Debug and Trace logging.")
flag.Parse()
if debugAndTrace {
opts.Trace, opts.Debug = true, true
}
// Profiler
go func() {
log.Println(http.ListenAndServe("localhost:6062", nil))
}()
// Parse config if given
log.Println("starting up!")
s := server.New()
s.AcceptLoop(host, port)
}
// Parse config if given
s := server.New(opts)
s.AcceptLoop()
}

View File

@@ -17,7 +17,7 @@ import (
// The size of the bufio reader/writer on top of the socket.
//const defaultBufSize = 32768
const defaultBufSize = 65536
const defaultBufSize = 32768
type client struct {
mu sync.Mutex
@@ -79,8 +79,24 @@ func (c *client) readLoop() {
}
}
func (c *client) traceMsg(msg []byte) {
opa := []interface{}{"Processing msg", string(c.pa.subject), string(c.pa.reply), string(msg)}
Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid))
}
func (c *client) traceOp(op string, arg []byte) {
if !trace {
return
}
opa := []interface{}{fmt.Sprintf("%s OP", op)}
if arg != nil {
opa = append(opa, fmt.Sprintf("%s %s", op, string(arg)))
}
Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid))
}
func (c *client) processConnect(arg []byte) error {
// log.Printf("Got connect arg: '%s'\n", arg)
c.traceOp("CONNECT", arg)
// FIXME, check err
return json.Unmarshal(arg, &c.opts)
}
@@ -88,7 +104,7 @@ func (c *client) processConnect(arg []byte) error {
var pongResp = []byte(fmt.Sprintf("PONG%s", CR_LF))
func (c *client) processPing() {
// log.Printf("Process ping\n")
c.traceOp("PING", nil)
if c.conn == nil {
return
}
@@ -99,7 +115,7 @@ func (c *client) processPing() {
const argsLenMax = 3
func (c *client) processPub(arg []byte) error {
// log.Printf("Got pub arg: '%s'\n", arg)
c.traceOp("PUB", arg)
args := splitArg(arg)
switch len(args) {
case 2:
@@ -118,7 +134,6 @@ func (c *client) processPub(arg []byte) error {
if c.pa.size < 0 {
return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg)
}
// log.Printf("Parsed pubArg: %+v\n", c.pa)
return nil
}
@@ -146,10 +161,10 @@ func splitArg(arg []byte) [][]byte {
}
func (c *client) processSub(argo []byte) error {
c.traceOp("SUB", argo)
// Copy so we do not reference a potentially large buffer
arg := make([]byte, len(argo))
copy(arg, argo)
// log.Printf("Got sub arg for client[%v]: '%s'\n", c, arg)
args := splitArg(arg)
sub := &subscription{client: c}
switch len(args) {
@@ -189,9 +204,8 @@ func (c *client) unsubscribe(sub *subscription) {
}
func (c *client) processUnsub(arg []byte) error {
// log.Printf("Got unsub arg for client[%v]: '%s'\n", c, arg)
c.traceOp("UNSUB", arg)
args := splitArg(arg)
var sid []byte
max := -1
@@ -204,11 +218,12 @@ func (c *client) processUnsub(arg []byte) error {
default:
return fmt.Errorf("processUnsub Parse Error: '%s'", arg)
}
sub := (c.subs.Get(sid)).(*subscription)
if max > 0 {
sub.max = int64(max)
if sub, ok := (c.subs.Get(sid)).(*subscription); ok {
if max > 0 {
sub.max = int64(max)
}
c.unsubscribe(sub)
}
c.unsubscribe(sub)
return nil
}
@@ -248,6 +263,7 @@ func (sub *subscription) deliverMsg(mh, msg []byte) {
// go flusher routine. Single for all connections?
func (c *client) processMsg(msg []byte) {
c.traceMsg(msg)
c.nm++
if c.srv == nil {
return
@@ -290,7 +306,8 @@ func (c *client) closeConnection() {
if c.conn == nil {
return
}
// log.Printf("Closing Connection: %v\n", c)
Debug("Client connection closed", clientConnStr(c.conn), c.cid)
// c.bw.Flush()
c.conn.Close()
c.conn = nil

View File

@@ -7,7 +7,7 @@ import (
)
const (
VERSION = "0.1.0.alpha.1"
VERSION = "go 0.1.0.alpha.1"
DEFAULT_PORT = 4222
DEFAULT_HOST = "0.0.0.0"

98
server/log.go Normal file
View File

@@ -0,0 +1,98 @@
// Copyright 2012 Apcera Inc. All rights reserved.
package server
import (
"fmt"
"log"
"strings"
)
// logging functionality, compatible with original nats-server
var trace bool
var debug bool
func LogSetup() {
log.SetFlags(0)
}
func (s *Server) LogInit() {
if s.opts.Logtime {
log.SetFlags(log.LstdFlags)
}
if s.opts.Trace {
Log(s.opts)
}
if s.opts.Debug {
debug = true
Log("DEBUG is on")
}
if s.opts.Trace {
trace = true
Log("TRACE is on")
}
}
func alreadyFormatted(s string) bool {
return strings.HasPrefix(s, "[")
}
func logStr(v []interface{}) string {
args := make([]string, 0, len(v))
for _, vt := range v {
switch t := vt.(type) {
case string:
if alreadyFormatted(t) {
args = append(args, t)
} else {
t = strings.Replace(t, "\"", "\\\"", -1)
args = append(args, fmt.Sprintf("\"%s\"", t))
}
default:
args = append(args, fmt.Sprintf("%+v", vt))
}
}
return fmt.Sprintf("[%s]", strings.Join(args,", "))
}
func Log(v ...interface{}) {
log.Print(logStr(v))
}
func Logf(format string, v ...interface{}) {
Log(fmt.Sprintf(format, v...))
}
func Fatal(v ...interface{}) {
log.Fatalf(logStr(v))
}
func Fatalf(format string, v ...interface{}) {
Fatal(fmt.Sprintf(format, v...))
}
func Debug(v ...interface{}) {
if debug {
Log(v...)
}
}
func DebugF(format string, v ...interface{}) {
if debug {
Debug(fmt.Sprintf(format, v...))
}
}
func Trace(v ...interface{}) {
if trace {
Log(v...)
}
}
func TraceF(format string, v ...interface{}) {
if trace {
Trace(fmt.Sprintf(format, v...))
}
}

View File

@@ -6,7 +6,6 @@ import (
"bufio"
"encoding/json"
"fmt"
"log"
"net"
"sync/atomic"
@@ -14,11 +13,20 @@ import (
"github.com/apcera/gnatsd/sublist"
)
type Options struct {
Host string
Port int
Trace bool
Debug bool
Logtime bool
MaxConn int
}
type info struct {
Id string `json:"server_id"`
Version string `json:"version"`
Host string `json:"host"`
Port uint `json:"port"`
Port int `json:"port"`
AuthRequired bool `json:"auth_required"`
SslRequired bool `json:"ssl_required"`
MaxPayload int `json:"max_payload"`
@@ -29,44 +37,61 @@ type Server struct {
infoJson []byte
sl *sublist.Sublist
gcid uint64
opts Options
trace bool
debug bool
}
func New() *Server {
s := &Server{
info: info{
Id: genId(),
Version: VERSION,
Host: DEFAULT_HOST,
Port: DEFAULT_PORT,
AuthRequired: false,
SslRequired: false,
MaxPayload: MAX_PAYLOAD_SIZE,
},
sl: sublist.New(),
func optionDefaults(opt *Options) {
if opt.MaxConn == 0 {
opt.MaxConn = DEFAULT_MAX_CONNECTIONS
}
}
func New(opts Options) *Server {
optionDefaults(&opts)
inf := info{
Id: genId(),
Version: VERSION,
Host: opts.Host,
Port: opts.Port,
AuthRequired: false,
SslRequired: false,
MaxPayload: MAX_PAYLOAD_SIZE,
}
s := &Server{
info: inf,
sl: sublist.New(),
opts: opts,
debug: opts.Debug,
trace: opts.Trace,
}
// Setup logging with flags
s.LogInit()
// Generate the info json
b, err := json.Marshal(s.info)
if err != nil {
log.Fatalf("Err marshalling INFO JSON: %+v\n", err)
Fatalf("Err marshalling INFO JSON: %+v\n", err)
}
s.infoJson = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF))
return s
}
func (s *Server) AcceptLoop(host string, port int) {
hp := fmt.Sprintf("%s:%d", host, port)
func (s *Server) AcceptLoop() {
Logf("Starting nats-server version %s on port %d", VERSION, s.opts.Port)
hp := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port)
l, e := net.Listen("tcp", hp)
if e != nil {
println(e)
Fatalf("Error listening on port: %d - %v", s.opts.Port, e)
return
}
log.Println("Listening on ", l.Addr())
for {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Printf("Accept error: %v", err)
Logf("Accept error: %v", err)
}
continue
}
@@ -74,21 +99,30 @@ func (s *Server) AcceptLoop(host string, port int) {
}
}
func clientConnStr(conn net.Conn) interface{} {
if ip, ok := conn.(*net.TCPConn); ok {
addr := ip.RemoteAddr().(*net.TCPAddr)
return []string{fmt.Sprintf("%v, %d", addr.IP, addr.Port)}
}
return "N/A"
}
func (s *Server) createClient(conn net.Conn) *client {
c := &client{srv: s, conn: conn}
c.cid = atomic.AddUint64(&s.gcid, 1)
// log.Printf("Creating Client: %+v\n", c)
c.bw = bufio.NewWriterSize(c.conn, defaultBufSize)
c.br = bufio.NewReaderSize(c.conn, defaultBufSize)
c.subs = hashmap.New()
/*
if ipc := conn.(*net.TCPConn) ; ipc != nil {
ipc.SetReadBuffer(65536)
}
*/
if ip, ok := conn.(*net.TCPConn); ok {
ip.SetReadBuffer(32768)
}
s.sendInfo(c)
go c.readLoop()
Debug("Client connection created", clientConnStr(conn), c.cid)
return c
}