From bcb777150a3dc6aa23ed790b6ed2c68834bc850b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 22 Nov 2020 10:24:32 -0800 Subject: [PATCH] Fix race Signed-off-by: Derek Collison --- server/consumer.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index e22f7864..db6f9499 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -322,6 +322,15 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { } } + // Grab the client, account and server reference. + c := mset.client + if c == nil { + return nil, fmt.Errorf("stream not valid") + } + c.mu.Lock() + s, a := c.srv, c.acc + c.mu.Unlock() + // Hold mset lock here. mset.mu.Lock() @@ -387,6 +396,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { // Set name, which will be durable name if set, otherwise we create one at random. o := &Consumer{ mset: mset, + acc: a, config: *config, dsubj: config.DeliverSubject, active: true, @@ -459,14 +469,6 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) { o.selectStartingSeqNo() // Now register with mset and create the ack subscription. - c := mset.client - if c == nil { - mset.mu.Unlock() - return nil, fmt.Errorf("stream not valid") - } - s, a := c.srv, c.acc - o.acc = a - // Check if we already have this one registered. if eo, ok := mset.consumers[o.name]; ok { mset.mu.Unlock()