diff --git a/server/client.go b/server/client.go index 01006d99..34eaeb7e 100644 --- a/server/client.go +++ b/server/client.go @@ -6,7 +6,6 @@ import ( "bufio" "encoding/json" "fmt" - "log" "math/rand" "net" "sync" @@ -25,9 +24,9 @@ type client struct { opts clientOpts conn net.Conn bw *bufio.Writer - br *bufio.Reader srv *Server subs *hashmap.HashMap + pcd map[*client]struct{} cstats parseState } @@ -67,15 +66,26 @@ func (c *client) readLoop() { for { n, err := c.conn.Read(b) if err != nil { - // log.Printf("Encountered a read error: %v\n", err) c.closeConnection() return } if err := c.parse(b[:n]); err != nil { - log.Printf("Parse Error: %v\n", err) + Logf("Parse Error: %v\n", err) c.closeConnection() return } + // Check pending clients for flush. + for cp, _ := range c.pcd { + // Flush those in the set + cp.mu.Lock() + err := cp.bw.Flush() + cp.mu.Unlock() + if err != nil { + // FIXME, close connection? + Logf("Error flushing client connection: %v\n", err) + } + delete(c.pcd, cp) + } } } @@ -109,15 +119,44 @@ func (c *client) processPing() { if c.conn == nil { return } - // FIXME, check err - c.conn.Write(pongResp) + c.mu.Lock() + c.bw.Write(pongResp) + err := c.bw.Flush() + c.mu.Unlock() + if err != nil { + // FIXME, close connection? + Logf("Error flushing client connection [PING]: %v\n", err) + } } const argsLenMax = 3 func (c *client) processPub(arg []byte) error { - c.traceOp("PUB", arg) - args := splitArg(arg) + if trace { + c.traceOp("PUB", arg) + } + + // Unroll splitArgs to avoid runtime/heap issues + a := [argsLenMax][]byte{} + args := a[:0] + start := -1 + for i, b := range arg { + switch b { + case ' ', '\t', '\r', '\n': + if start >= 0 { + args = append(args, arg[start:i]) + start = -1 + } + default: + if start < 0 { + start = i + } + } + } + if start >= 0 { + args = append(args, arg[start:]) + } + switch len(args) { case 2: c.pa.subject = args[0] @@ -240,24 +279,38 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte { return mh } -func (sub *subscription) deliverMsg(mh, msg []byte) { +// Used to treat map as efficient set +type empty struct{} +var needFlush = empty{} + +func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { if sub.client == nil || sub.client.conn == nil { return } - sub.client.mu.Lock() + client := sub.client + client.mu.Lock() sub.nm++ if sub.max > 0 && sub.nm > sub.max { - sub.client.mu.Unlock() - sub.client.unsubscribe(sub) + client.mu.Unlock() + client.unsubscribe(sub) return } - - sub.client.bw.Write(mh) - sub.client.bw.Write(msg) - sub.client.bw.WriteString("\r\n") - // FIXME: Make efficient with flusher.. - sub.client.bw.Flush() - sub.client.mu.Unlock() + // Deliver to the client. + _, err := client.bw.Write(mh) + if err != nil { + Logf("Error writing msg header: %v\n", err) + } + _, err = client.bw.Write(msg) + if err != nil { + Logf("Error writing msg: %v\n", err) + } + // FIXME, this is already attached to original message + _, err = client.bw.WriteString(CR_LF) + if err != nil { + Logf("Error writing CRLF: %v\n", err) + } + client.mu.Unlock() + c.pcd[sub.client] = needFlush } // TODO @@ -284,6 +337,7 @@ func (c *client) processMsg(msg []byte) { } // msg header + // FIXME, put MSG into initializer msgh = append(msgh, "MSG "...) msgh = append(msgh, c.pa.subject...) msgh = append(msgh, ' ') @@ -296,13 +350,13 @@ func (c *client) processMsg(msg []byte) { continue } mh := c.msgHeader(msgh[:si], sub) - sub.deliverMsg(mh, msg) + c.deliverMsg(sub, mh, msg) } if len(qsubs) > 0 { index := rand.Int() % len(qsubs) sub := qsubs[index] mh := c.msgHeader(msgh[:si], sub) - sub.deliverMsg(mh, msg) + c.deliverMsg(sub, mh, msg) } } @@ -312,12 +366,14 @@ func (c *client) closeConnection() { } Debug("Client connection closed", clientConnStr(c.conn), c.cid) - // c.bw.Flush() + c.mu.Lock() + c.bw.Flush() c.conn.Close() c.conn = nil + subs := c.subs.All() + c.mu.Unlock() if c.srv != nil { - subs := c.subs.All() for _, s := range subs { if sub, ok := s.(*subscription); ok { c.srv.sl.Remove(sub.subject, sub)