diff --git a/server/consumer.go b/server/consumer.go index 08455b9a..1e105c34 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -895,6 +895,10 @@ func (o *Consumer) updateDeliveryInterest(localInterest bool) { } func (o *Consumer) deleteNotActive() { + // Need to check again if there is not an interest now that the timer fires. + if !o.hasNoLocalInterest() { + return + } o.mu.RLock() if o.mset == nil { o.mu.RUnlock() @@ -2369,9 +2373,9 @@ func (o *Consumer) Active() bool { // hasNoLocalInterest return true if we have no local interest. func (o *Consumer) hasNoLocalInterest() bool { - o.mu.Lock() + o.mu.RLock() rr := o.acc.sl.Match(o.config.DeliverSubject) - o.mu.Unlock() + o.mu.RUnlock() return len(rr.psubs)+len(rr.qsubs) == 0 } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3130a298..bb6048c6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1852,11 +1852,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, if err != nil { resp.Error = jsNotFoundError(err) } else if mset != nil { - if mset.Config().internal { - err = errors.New("not allowed to delete internal stream") - } else { - err = mset.stop(true, wasLeader) - } + err = mset.stop(true, wasLeader) } if sa.Group.node != nil { @@ -2096,9 +2092,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb if err != nil { resp.Error = jsNotFoundError(err) } else if mset != nil { - if mset.Config().internal { - err = errors.New("not allowed to delete internal consumer") - } else if o := mset.LookupConsumer(ca.Name); o != nil { + if o := mset.LookupConsumer(ca.Name); o != nil { err = o.stop(true, true, wasLeader) } else { resp.Error = jsNoConsumerErr