From 8c9a30b8a189bacae4497be411ad5f5b13a8cf9f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 28 May 2020 12:25:40 -0700 Subject: [PATCH] Avoid deadlock by releasing consumer lock Signed-off-by: Derek Collison --- go.mod | 4 +--- server/consumer.go | 38 ++++++++++++++++++-------------------- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index bdb274a0..4e50e426 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,9 @@ module github.com/nats-io/nats-server/v2 -go 1.14 - require ( + github.com/minio/highwayhash v1.0.0 github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7 github.com/nats-io/nats.go v1.10.0 - github.com/minio/highwayhash v1.0.0 github.com/nats-io/nkeys v0.1.4 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 diff --git a/server/consumer.go b/server/consumer.go index 1f16181a..8ba5df6e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -483,6 +483,17 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { return o, nil } +// We need to make sure we protect access to the sendq. +// Do all advisory sends here. +// Lock should be held on entry but will be released. +func (o *Consumer) sendAdvisory(subj string, msg []byte) { + if o.mset != nil && o.mset.sendq != nil { + o.mu.Unlock() + o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, msg, nil, 0} + o.mu.Lock() + } +} + func (o *Consumer) sendDeleteAdvisoryLocked() { e := JSConsumerActionAdvisory{ TypedEvent: TypedEvent{ @@ -500,11 +511,8 @@ func (o *Consumer) sendDeleteAdvisoryLocked() { return } - // can be nil during shutdown, locks are held in the caller - if o.mset != nil && o.mset.sendq != nil { - subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name - o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0} - } + subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name + o.sendAdvisory(subj, j) } func (o *Consumer) sendCreateAdvisory() { @@ -527,10 +535,8 @@ func (o *Consumer) sendCreateAdvisory() { return } - if o.mset != nil && o.mset.sendq != nil { - subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name - o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0} - } + subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name + o.sendAdvisory(subj, j) } // Created returns created time. @@ -733,9 +739,7 @@ func (o *Consumer) processTerm(sseq, dseq, dcount uint64) { } subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name - if o.mset != nil && o.mset.sendq != nil { - o.mset.sendq <- &jsPubMsg{subj, subj, _EMPTY_, j, nil, 0} - } + o.sendAdvisory(subj, j) } // This will restore the state from disk. @@ -885,10 +889,7 @@ func (o *Consumer) sampleAck(sseq, dseq, dcount uint64) { return } - // can be nil during server Shutdown but with some ACKs in flight internally, lock held in caller - if o.mset != nil && o.mset.sendq != nil { - o.mset.sendq <- &jsPubMsg{o.ackEventT, o.ackEventT, _EMPTY_, j, nil, 0} - } + o.sendAdvisory(o.ackEventT, j) } // Process an ack for a message. @@ -1041,10 +1042,7 @@ func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) { return } - // can be nil during shutdown, locks are held in the caller - if o.mset != nil && o.mset.sendq != nil { - o.mset.sendq <- &jsPubMsg{o.deliveryExcEventT, o.deliveryExcEventT, _EMPTY_, j, nil, 0} - } + o.sendAdvisory(o.deliveryExcEventT, j) } // Check to see if the candidate subject matches a filter if its present.