diff --git a/server/client.go b/server/client.go index 7ab19a92..4e4dd769 100644 --- a/server/client.go +++ b/server/client.go @@ -63,11 +63,11 @@ func init() { func (c *client) readLoop() { b := make([]byte, defaultBufSize) -// log.Printf("b len = %d, cap = %d\n", len(b), cap(b)) + // log.Printf("b len = %d, cap = %d\n", len(b), cap(b)) for { n, err := c.conn.Read(b) if err != nil { -// log.Printf("Encountered a read error: %v\n", err) + // log.Printf("Encountered a read error: %v\n", err) c.closeConnection() return } @@ -80,7 +80,8 @@ func (c *client) readLoop() { } func (c *client) traceMsg(msg []byte) { - opa := []interface{}{"Processing msg", string(c.pa.subject), string(c.pa.reply), string(msg)} + pm := fmt.Sprintf("Processing msg: %d", c.nm) + opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg)} Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid)) } @@ -180,13 +181,10 @@ func (c *client) processSub(argo []byte) error { return fmt.Errorf("processSub Parse Error: '%s'", arg) } - // log.Printf("sub.subject = '%s'\n", sub.subject) - // log.Printf("sub.queue = '%s'\n", sub.queue) - // log.Printf("sub.sid = '%s'\n", sub.sid) + c.mu.Lock() + defer c.mu.Unlock() - if c.subs != nil { - c.subs.Set(sub.sid, sub) - } + c.subs.Set(sub.sid, sub) if c.srv != nil { c.srv.sl.Insert(sub.subject, sub) } @@ -194,9 +192,12 @@ func (c *client) processSub(argo []byte) error { } func (c *client) unsubscribe(sub *subscription) { + c.mu.Lock() + defer c.mu.Unlock() if sub.max > 0 && sub.nm <= sub.max { return } + c.traceOp("DELSUB", sub.sid) c.subs.Remove(sub.sid) if c.srv != nil { c.srv.sl.Remove(sub.subject, sub) @@ -218,7 +219,7 @@ func (c *client) processUnsub(arg []byte) error { default: return fmt.Errorf("processUnsub Parse Error: '%s'", arg) } - if sub, ok := (c.subs.Get(sid)).(*subscription); ok { + if sub, ok := (c.subs.Get(sid)).(*subscription); ok { if max > 0 { sub.max = int64(max) } @@ -240,16 +241,17 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte { } func (sub *subscription) deliverMsg(mh, msg []byte) { - sub.nm++ if sub.client == nil || sub.client.conn == nil { return } + sub.client.mu.Lock() + sub.nm++ if sub.max > 0 && sub.nm > sub.max { + sub.client.mu.Unlock() sub.client.unsubscribe(sub) return } - sub.client.mu.Lock() sub.client.bw.Write(mh) sub.client.bw.Write(msg) sub.client.bw.WriteString("\r\n") @@ -263,8 +265,8 @@ func (sub *subscription) deliverMsg(mh, msg []byte) { // go flusher routine. Single for all connections? func (c *client) processMsg(msg []byte) { - c.traceMsg(msg) c.nm++ + c.traceMsg(msg) if c.srv == nil { return } @@ -315,8 +317,9 @@ func (c *client) closeConnection() { if c.srv != nil { subs := c.subs.All() for _, s := range subs { - sub := s.(*subscription) - c.srv.sl.Remove(sub.subject, sub) + if sub, ok := s.(*subscription); ok { + c.srv.sl.Remove(sub.subject, sub) + } } }