diff --git a/server/consumer.go b/server/consumer.go index 1f16181a..8ba5df6e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -483,6 +483,17 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { return o, nil } +// We need to make sure we protect access to the sendq. +// Do all advisory sends here. +// Lock should be held on entry but will be released. +func (o *Consumer) sendAdvisory(subj string, msg []byte) { + if o.mset != nil && o.mset.sendq != nil { + o.mu.Unlock() + o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, msg, nil, 0} + o.mu.Lock() + } +} + func (o *Consumer) sendDeleteAdvisoryLocked() { e := JSConsumerActionAdvisory{ TypedEvent: TypedEvent{ @@ -500,11 +511,8 @@ func (o *Consumer) sendDeleteAdvisoryLocked() { return } - // can be nil during shutdown, locks are held in the caller - if o.mset != nil && o.mset.sendq != nil { - subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name - o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0} - } + subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name + o.sendAdvisory(subj, j) } func (o *Consumer) sendCreateAdvisory() { @@ -527,10 +535,8 @@ func (o *Consumer) sendCreateAdvisory() { return } - if o.mset != nil && o.mset.sendq != nil { - subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name - o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0} - } + subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name + o.sendAdvisory(subj, j) } // Created returns created time. @@ -733,9 +739,7 @@ func (o *Consumer) processTerm(sseq, dseq, dcount uint64) { } subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name - if o.mset != nil && o.mset.sendq != nil { - o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0} - } + o.sendAdvisory(subj, j) } // This will restore the state from disk. @@ -885,10 +889,7 @@ func (o *Consumer) sampleAck(sseq, dseq, dcount uint64) { return } - // can be nil during server Shutdown but with some ACKs in flight internally, lock held in caller - if o.mset != nil && o.mset.sendq != nil { - o.mset.sendq <- &jsPubMsg{o.ackEventT, o.ackEventT, _EMPTY_, j, nil, 0} - } + o.sendAdvisory(o.ackEventT, j) } // Process an ack for a message. @@ -1041,10 +1042,7 @@ func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) { return } - // can be nil during shutdown, locks are held in the caller - if o.mset != nil && o.mset.sendq != nil { - o.mset.sendq <- &jsPubMsg{o.deliveryExcEventT, o.deliveryExcEventT, _EMPTY_, j, nil, 0} - } + o.sendAdvisory(o.deliveryExcEventT, j) } // Check to see if the candidate subject matches a filter if its present.