Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-11-22 10:24:32 -08:00
parent b528b1f74e
commit bcb777150a

View File

@@ -322,6 +322,15 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
}
}
// Grab the client, account and server reference.
c := mset.client
if c == nil {
return nil, fmt.Errorf("stream not valid")
}
c.mu.Lock()
s, a := c.srv, c.acc
c.mu.Unlock()
// Hold mset lock here.
mset.mu.Lock()
@@ -387,6 +396,7 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
// Set name, which will be durable name if set, otherwise we create one at random.
o := &Consumer{
mset: mset,
acc: a,
config: *config,
dsubj: config.DeliverSubject,
active: true,
@@ -459,14 +469,6 @@ func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
o.selectStartingSeqNo()
// Now register with mset and create the ack subscription.
c := mset.client
if c == nil {
mset.mu.Unlock()
return nil, fmt.Errorf("stream not valid")
}
s, a := c.srv, c.acc
o.acc = a
// Check if we already have this one registered.
if eo, ok := mset.consumers[o.name]; ok {
mset.mu.Unlock()