mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fixed bugs handling edge cases with auto-unsubscribe
This commit is contained in:
@@ -313,7 +313,9 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
func (c *client) unsubscribe(sub *subscription) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if sub.max > 0 && sub.nm <= sub.max {
|
||||
if sub.max > 0 && sub.nm < sub.max {
|
||||
Debugf("Deferring actual UNSUB(%s): %d max, %d received\n",
|
||||
string(sub.subject), sub.max, sub.nm)
|
||||
return
|
||||
}
|
||||
c.traceOp("DELSUB", sub.sid)
|
||||
@@ -341,6 +343,9 @@ func (c *client) processUnsub(arg []byte) error {
|
||||
if sub, ok := (c.subs.Get(sid)).(*subscription); ok {
|
||||
if max > 0 {
|
||||
sub.max = int64(max)
|
||||
} else {
|
||||
// Clear it here to override
|
||||
sub.max = 0
|
||||
}
|
||||
c.unsubscribe(sub)
|
||||
}
|
||||
@@ -368,15 +373,28 @@ type empty struct{}
|
||||
var needFlush = empty{}
|
||||
|
||||
func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
|
||||
if sub.client == nil || sub.client.conn == nil {
|
||||
if sub.client == nil {
|
||||
return
|
||||
}
|
||||
client := sub.client
|
||||
client.mu.Lock()
|
||||
sub.nm++
|
||||
if sub.max > 0 && sub.nm > sub.max {
|
||||
// Check if we should auto-unsubscribe.
|
||||
if sub.max > 0 {
|
||||
// If we are at the exact number, unsubscribe but
|
||||
// still process the message in hand, otherwise
|
||||
// unsubscribe and drop message on the floor.
|
||||
if sub.nm == sub.max {
|
||||
defer client.unsubscribe(sub)
|
||||
} else if sub.nm > sub.max {
|
||||
client.mu.Unlock()
|
||||
client.unsubscribe(sub)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if sub.client.conn == nil {
|
||||
client.mu.Unlock()
|
||||
client.unsubscribe(sub)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user