mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
[FIXED] Data RACE on Unsubscribe when client connection is closed
Resolves #331
This commit is contained in:
@@ -882,10 +882,12 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
|
||||
// unsubscribe and drop message on the floor.
|
||||
if sub.nm == sub.max {
|
||||
c.Debugf("Auto-unsubscribe limit of %d reached for sid '%s'\n", sub.max, string(sub.sid))
|
||||
defer client.unsubscribe(sub)
|
||||
// Due to defer, reverse the code order so that execution
|
||||
// is consistent with other cases where we unsubscribe.
|
||||
if shouldForward {
|
||||
defer client.srv.broadcastUnSubscribe(sub)
|
||||
}
|
||||
defer client.unsubscribe(sub)
|
||||
} else if sub.nm > sub.max {
|
||||
c.Debugf("Auto-unsubscribe limit [%d] exceeded\n", sub.max)
|
||||
client.mu.Unlock()
|
||||
|
||||
@@ -575,10 +575,12 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
|
||||
}
|
||||
rsid := routeSid(sub)
|
||||
maxStr := _EMPTY_
|
||||
sub.client.mu.Lock()
|
||||
// Set max if we have it set and have not tripped auto-unsubscribe
|
||||
if sub.max > 0 && sub.nm < sub.max {
|
||||
maxStr = fmt.Sprintf(" %d", sub.max)
|
||||
}
|
||||
sub.client.mu.Unlock()
|
||||
proto := fmt.Sprintf(unsubProto, rsid, maxStr)
|
||||
s.broadcastInterestToRoutes(proto)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user