mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #1428 from nats-io/dl
Avoid deadlock by releasing consumer lock.
This commit is contained in:
@@ -483,6 +483,17 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
|
||||
return o, nil
|
||||
}
|
||||
|
||||
// We need to make sure we protect access to the sendq.
|
||||
// Do all advisory sends here.
|
||||
// Lock should be held on entry but will be released.
|
||||
func (o *Consumer) sendAdvisory(subj string, msg []byte) {
|
||||
if o.mset != nil && o.mset.sendq != nil {
|
||||
o.mu.Unlock()
|
||||
o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, msg, nil, 0}
|
||||
o.mu.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
func (o *Consumer) sendDeleteAdvisoryLocked() {
|
||||
e := JSConsumerActionAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
@@ -500,11 +511,8 @@ func (o *Consumer) sendDeleteAdvisoryLocked() {
|
||||
return
|
||||
}
|
||||
|
||||
// can be nil during shutdown, locks are held in the caller
|
||||
if o.mset != nil && o.mset.sendq != nil {
|
||||
subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name
|
||||
o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0}
|
||||
}
|
||||
subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name
|
||||
o.sendAdvisory(subj, j)
|
||||
}
|
||||
|
||||
func (o *Consumer) sendCreateAdvisory() {
|
||||
@@ -527,10 +535,8 @@ func (o *Consumer) sendCreateAdvisory() {
|
||||
return
|
||||
}
|
||||
|
||||
if o.mset != nil && o.mset.sendq != nil {
|
||||
subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name
|
||||
o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0}
|
||||
}
|
||||
subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name
|
||||
o.sendAdvisory(subj, j)
|
||||
}
|
||||
|
||||
// Created returns created time.
|
||||
@@ -733,9 +739,7 @@ func (o *Consumer) processTerm(sseq, dseq, dcount uint64) {
|
||||
}
|
||||
|
||||
subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name
|
||||
if o.mset != nil && o.mset.sendq != nil {
|
||||
o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0}
|
||||
}
|
||||
o.sendAdvisory(subj, j)
|
||||
}
|
||||
|
||||
// This will restore the state from disk.
|
||||
@@ -885,10 +889,7 @@ func (o *Consumer) sampleAck(sseq, dseq, dcount uint64) {
|
||||
return
|
||||
}
|
||||
|
||||
// can be nil during server Shutdown but with some ACKs in flight internally, lock held in caller
|
||||
if o.mset != nil && o.mset.sendq != nil {
|
||||
o.mset.sendq <- &jsPubMsg{o.ackEventT, o.ackEventT, _EMPTY_, j, nil, 0}
|
||||
}
|
||||
o.sendAdvisory(o.ackEventT, j)
|
||||
}
|
||||
|
||||
// Process an ack for a message.
|
||||
@@ -1041,10 +1042,7 @@ func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) {
|
||||
return
|
||||
}
|
||||
|
||||
// can be nil during shutdown, locks are held in the caller
|
||||
if o.mset != nil && o.mset.sendq != nil {
|
||||
o.mset.sendq <- &jsPubMsg{o.deliveryExcEventT, o.deliveryExcEventT, _EMPTY_, j, nil, 0}
|
||||
}
|
||||
o.sendAdvisory(o.deliveryExcEventT, j)
|
||||
}
|
||||
|
||||
// Check to see if the candidate subject matches a filter if its present.
|
||||
|
||||
Reference in New Issue
Block a user