// Copyright 2012 Apcera Inc. All rights reserved. package server import ( "bufio" "encoding/json" "fmt" "math/rand" "net" "sync" "time" "github.com/apcera/gnatsd/hashmap" ) // The size of the bufio reader/writer on top of the socket. //const defaultBufSize = 32768 const defaultBufSize = 32768 type client struct { mu sync.Mutex cid uint64 opts clientOpts conn net.Conn bw *bufio.Writer srv *Server subs *hashmap.HashMap pcd map[*client]struct{} cstats parseState } func (c *client) String() string { return fmt.Sprintf("cid:%d", c.cid) } type cstats struct { nr int nb int nm int } type subscription struct { client *client subject []byte queue []byte sid []byte nm int64 max int64 } type clientOpts struct { Verbose bool `json:"verbose"` Pedantic bool `json:"pedantic"` SslRequired bool `json:"ssl_required"` } func init() { rand.Seed(time.Now().UnixNano()) } func (c *client) readLoop() { b := make([]byte, defaultBufSize) // log.Printf("b len = %d, cap = %d\n", len(b), cap(b)) for { n, err := c.conn.Read(b) if err != nil { c.closeConnection() return } if err := c.parse(b[:n]); err != nil { Logf("Parse Error: %v\n", err) c.closeConnection() return } // Check pending clients for flush. for cp, _ := range c.pcd { // Flush those in the set cp.mu.Lock() err := cp.bw.Flush() cp.mu.Unlock() if err != nil { // FIXME, close connection? Logf("Error flushing client connection: %v\n", err) } delete(c.pcd, cp) } } } func (c *client) traceMsg(msg []byte) { pm := fmt.Sprintf("Processing msg: %d", c.nm) opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg)} Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) } func (c *client) traceOp(op string, arg []byte) { if !trace { return } opa := []interface{}{fmt.Sprintf("%s OP", op)} if arg != nil { opa = append(opa, fmt.Sprintf("%s %s", op, string(arg))) } Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) } func (c *client) processConnect(arg []byte) error { c.traceOp("CONNECT", arg) // FIXME, check err return json.Unmarshal(arg, &c.opts) } var pongResp = []byte(fmt.Sprintf("PONG%s", CR_LF)) func (c *client) processPing() { c.traceOp("PING", nil) if c.conn == nil { return } c.mu.Lock() c.bw.Write(pongResp) err := c.bw.Flush() c.mu.Unlock() if err != nil { // FIXME, close connection? Logf("Error flushing client connection [PING]: %v\n", err) } } const argsLenMax = 3 func (c *client) processPub(arg []byte) error { if trace { c.traceOp("PUB", arg) } // Unroll splitArgs to avoid runtime/heap issues a := [argsLenMax][]byte{} args := a[:0] start := -1 for i, b := range arg { switch b { case ' ', '\t', '\r', '\n': if start >= 0 { args = append(args, arg[start:i]) start = -1 } default: if start < 0 { start = i } } } if start >= 0 { args = append(args, arg[start:]) } switch len(args) { case 2: c.pa.subject = args[0] c.pa.reply = nil c.pa.size = parseSize(args[1]) c.pa.szb = args[1] case 3: c.pa.subject = args[0] c.pa.reply = args[1] c.pa.size = parseSize(args[2]) c.pa.szb = args[2] default: return fmt.Errorf("processPub Parse Error: '%s'", arg) } if c.pa.size < 0 { return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg) } return nil } func splitArg(arg []byte) [][]byte { a := [argsLenMax][]byte{} args := a[:0] start := -1 for i, b := range arg { switch b { case ' ', '\t', '\r', '\n': if start >= 0 { args = append(args, arg[start:i]) start = -1 } default: if start < 0 { start = i } } } if start >= 0 { args = append(args, arg[start:]) } return args } func (c *client) processSub(argo []byte) error { c.traceOp("SUB", argo) // Copy so we do not reference a potentially large buffer arg := make([]byte, len(argo)) copy(arg, argo) args := splitArg(arg) sub := &subscription{client: c} switch len(args) { case 2: sub.subject = args[0] sub.queue = nil sub.sid = args[1] case 3: sub.subject = args[0] sub.queue = args[1] sub.sid = args[2] default: return fmt.Errorf("processSub Parse Error: '%s'", arg) } c.mu.Lock() defer c.mu.Unlock() c.subs.Set(sub.sid, sub) if c.srv != nil { c.srv.sl.Insert(sub.subject, sub) } return nil } func (c *client) unsubscribe(sub *subscription) { c.mu.Lock() defer c.mu.Unlock() if sub.max > 0 && sub.nm <= sub.max { return } c.traceOp("DELSUB", sub.sid) c.subs.Remove(sub.sid) if c.srv != nil { c.srv.sl.Remove(sub.subject, sub) } } func (c *client) processUnsub(arg []byte) error { c.traceOp("UNSUB", arg) args := splitArg(arg) var sid []byte max := -1 switch len(args) { case 1: sid = args[0] case 2: sid = args[0] max = parseSize(args[1]) default: return fmt.Errorf("processUnsub Parse Error: '%s'", arg) } if sub, ok := (c.subs.Get(sid)).(*subscription); ok { if max > 0 { sub.max = int64(max) } c.unsubscribe(sub) } return nil } func (c *client) msgHeader(mh []byte, sub *subscription) []byte { mh = append(mh, sub.sid...) mh = append(mh, ' ') if c.pa.reply != nil { mh = append(mh, c.pa.reply...) mh = append(mh, ' ') } mh = append(mh, c.pa.szb...) mh = append(mh, "\r\n"...) return mh } // Used to treat map as efficient set type empty struct{} var needFlush = empty{} func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { if sub.client == nil || sub.client.conn == nil { return } client := sub.client client.mu.Lock() sub.nm++ if sub.max > 0 && sub.nm > sub.max { client.mu.Unlock() client.unsubscribe(sub) return } // Deliver to the client. _, err := client.bw.Write(mh) if err != nil { Logf("Error writing msg header: %v\n", err) } _, err = client.bw.Write(msg) if err != nil { Logf("Error writing msg: %v\n", err) } // FIXME, this is already attached to original message _, err = client.bw.WriteString(CR_LF) if err != nil { Logf("Error writing CRLF: %v\n", err) } client.mu.Unlock() c.pcd[sub.client] = needFlush } // TODO // Block pub goroutine on bufio locked write buffer with // go flusher routine. Single for all connections? func (c *client) processMsg(msg []byte) { c.nm++ if trace { c.traceMsg(msg) } if c.srv == nil { return } qsubsA := [32]*subscription{} qsubs := qsubsA[:0] scratch := [512]byte{} msgh := scratch[:0] r := c.srv.sl.Match(c.pa.subject) if len(r) <= 0 { return } // msg header // FIXME, put MSG into initializer msgh = append(msgh, "MSG "...) msgh = append(msgh, c.pa.subject...) msgh = append(msgh, ' ') si := len(msgh) for _, v := range r { sub := v.(*subscription) if sub.queue != nil { qsubs = append(qsubs, sub) continue } mh := c.msgHeader(msgh[:si], sub) c.deliverMsg(sub, mh, msg) } if len(qsubs) > 0 { index := rand.Int() % len(qsubs) sub := qsubs[index] mh := c.msgHeader(msgh[:si], sub) c.deliverMsg(sub, mh, msg) } } func (c *client) closeConnection() { if c.conn == nil { return } Debug("Client connection closed", clientConnStr(c.conn), c.cid) c.mu.Lock() c.bw.Flush() c.conn.Close() c.conn = nil subs := c.subs.All() c.mu.Unlock() if c.srv != nil { for _, s := range subs { if sub, ok := s.(*subscription); ok { c.srv.sl.Remove(sub.subject, sub) } } } /* log.Printf("Sublist Stats: %+v\n", c.srv.sl.Stats()) if c.nr > 0 { log.Printf("stats: %d %d %d\n", c.nr, c.nb, c.nm) log.Printf("bytes per read: %d\n", c.nb/c.nr) log.Printf("msgs per read: %d\n", c.nm/c.nr) } */ }