diff --git a/server/client.go b/server/client.go index 2cce18ad..d9537f0b 100644 --- a/server/client.go +++ b/server/client.go @@ -313,7 +313,9 @@ func (c *client) processSub(argo []byte) (err error) { func (c *client) unsubscribe(sub *subscription) { c.mu.Lock() defer c.mu.Unlock() - if sub.max > 0 && sub.nm <= sub.max { + if sub.max > 0 && sub.nm < sub.max { + Debugf("Deferring actual UNSUB(%s): %d max, %d received\n", + string(sub.subject), sub.max, sub.nm) return } c.traceOp("DELSUB", sub.sid) @@ -341,6 +343,9 @@ func (c *client) processUnsub(arg []byte) error { if sub, ok := (c.subs.Get(sid)).(*subscription); ok { if max > 0 { sub.max = int64(max) + } else { + // Clear it here to override + sub.max = 0 } c.unsubscribe(sub) } @@ -368,15 +373,28 @@ type empty struct{} var needFlush = empty{} func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { - if sub.client == nil || sub.client.conn == nil { + if sub.client == nil { return } client := sub.client client.mu.Lock() sub.nm++ - if sub.max > 0 && sub.nm > sub.max { + // Check if we should auto-unsubscribe. + if sub.max > 0 { + // If we are at the exact number, unsubscribe but + // still process the message in hand, otherwise + // unsubscribe and drop message on the floor. + if sub.nm == sub.max { + defer client.unsubscribe(sub) + } else if sub.nm > sub.max { + client.mu.Unlock() + client.unsubscribe(sub) + return + } + } + + if sub.client.conn == nil { client.mu.Unlock() - client.unsubscribe(sub) return }