mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Added locking, removed spurious logging
This commit is contained in:
@@ -63,11 +63,11 @@ func init() {
|
||||
|
||||
func (c *client) readLoop() {
|
||||
b := make([]byte, defaultBufSize)
|
||||
// log.Printf("b len = %d, cap = %d\n", len(b), cap(b))
|
||||
// log.Printf("b len = %d, cap = %d\n", len(b), cap(b))
|
||||
for {
|
||||
n, err := c.conn.Read(b)
|
||||
if err != nil {
|
||||
// log.Printf("Encountered a read error: %v\n", err)
|
||||
// log.Printf("Encountered a read error: %v\n", err)
|
||||
c.closeConnection()
|
||||
return
|
||||
}
|
||||
@@ -80,7 +80,8 @@ func (c *client) readLoop() {
|
||||
}
|
||||
|
||||
func (c *client) traceMsg(msg []byte) {
|
||||
opa := []interface{}{"Processing msg", string(c.pa.subject), string(c.pa.reply), string(msg)}
|
||||
pm := fmt.Sprintf("Processing msg: %d", c.nm)
|
||||
opa := []interface{}{pm, string(c.pa.subject), string(c.pa.reply), string(msg)}
|
||||
Trace(logStr(opa), fmt.Sprintf("c: %d", c.cid))
|
||||
}
|
||||
|
||||
@@ -180,13 +181,10 @@ func (c *client) processSub(argo []byte) error {
|
||||
return fmt.Errorf("processSub Parse Error: '%s'", arg)
|
||||
}
|
||||
|
||||
// log.Printf("sub.subject = '%s'\n", sub.subject)
|
||||
// log.Printf("sub.queue = '%s'\n", sub.queue)
|
||||
// log.Printf("sub.sid = '%s'\n", sub.sid)
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.subs != nil {
|
||||
c.subs.Set(sub.sid, sub)
|
||||
}
|
||||
c.subs.Set(sub.sid, sub)
|
||||
if c.srv != nil {
|
||||
c.srv.sl.Insert(sub.subject, sub)
|
||||
}
|
||||
@@ -194,9 +192,12 @@ func (c *client) processSub(argo []byte) error {
|
||||
}
|
||||
|
||||
func (c *client) unsubscribe(sub *subscription) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if sub.max > 0 && sub.nm <= sub.max {
|
||||
return
|
||||
}
|
||||
c.traceOp("DELSUB", sub.sid)
|
||||
c.subs.Remove(sub.sid)
|
||||
if c.srv != nil {
|
||||
c.srv.sl.Remove(sub.subject, sub)
|
||||
@@ -218,7 +219,7 @@ func (c *client) processUnsub(arg []byte) error {
|
||||
default:
|
||||
return fmt.Errorf("processUnsub Parse Error: '%s'", arg)
|
||||
}
|
||||
if sub, ok := (c.subs.Get(sid)).(*subscription); ok {
|
||||
if sub, ok := (c.subs.Get(sid)).(*subscription); ok {
|
||||
if max > 0 {
|
||||
sub.max = int64(max)
|
||||
}
|
||||
@@ -240,16 +241,17 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte {
|
||||
}
|
||||
|
||||
func (sub *subscription) deliverMsg(mh, msg []byte) {
|
||||
sub.nm++
|
||||
if sub.client == nil || sub.client.conn == nil {
|
||||
return
|
||||
}
|
||||
sub.client.mu.Lock()
|
||||
sub.nm++
|
||||
if sub.max > 0 && sub.nm > sub.max {
|
||||
sub.client.mu.Unlock()
|
||||
sub.client.unsubscribe(sub)
|
||||
return
|
||||
}
|
||||
|
||||
sub.client.mu.Lock()
|
||||
sub.client.bw.Write(mh)
|
||||
sub.client.bw.Write(msg)
|
||||
sub.client.bw.WriteString("\r\n")
|
||||
@@ -263,8 +265,8 @@ func (sub *subscription) deliverMsg(mh, msg []byte) {
|
||||
// go flusher routine. Single for all connections?
|
||||
|
||||
func (c *client) processMsg(msg []byte) {
|
||||
c.traceMsg(msg)
|
||||
c.nm++
|
||||
c.traceMsg(msg)
|
||||
if c.srv == nil {
|
||||
return
|
||||
}
|
||||
@@ -315,8 +317,9 @@ func (c *client) closeConnection() {
|
||||
if c.srv != nil {
|
||||
subs := c.subs.All()
|
||||
for _, s := range subs {
|
||||
sub := s.(*subscription)
|
||||
c.srv.sl.Remove(sub.subject, sub)
|
||||
if sub, ok := s.(*subscription); ok {
|
||||
c.srv.sl.Remove(sub.subject, sub)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user