Merge pull request #1884 from nats-io/jsc_del_cons_fix

Fixed some issues with JS consumers
This commit is contained in:
Ivan Kozlovic
2021-02-03 20:43:02 -07:00
committed by GitHub
2 changed files with 8 additions and 10 deletions

View File

@@ -895,6 +895,10 @@ func (o *Consumer) updateDeliveryInterest(localInterest bool) {
}
func (o *Consumer) deleteNotActive() {
// Need to check again if there is not an interest now that the timer fires.
if !o.hasNoLocalInterest() {
return
}
o.mu.RLock()
if o.mset == nil {
o.mu.RUnlock()
@@ -2369,9 +2373,9 @@ func (o *Consumer) Active() bool {
// hasNoLocalInterest return true if we have no local interest.
func (o *Consumer) hasNoLocalInterest() bool {
o.mu.Lock()
o.mu.RLock()
rr := o.acc.sl.Match(o.config.DeliverSubject)
o.mu.Unlock()
o.mu.RUnlock()
return len(rr.psubs)+len(rr.qsubs) == 0
}

View File

@@ -1852,11 +1852,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
if err != nil {
resp.Error = jsNotFoundError(err)
} else if mset != nil {
if mset.Config().internal {
err = errors.New("not allowed to delete internal stream")
} else {
err = mset.stop(true, wasLeader)
}
err = mset.stop(true, wasLeader)
}
if sa.Group.node != nil {
@@ -2096,9 +2092,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
if err != nil {
resp.Error = jsNotFoundError(err)
} else if mset != nil {
if mset.Config().internal {
err = errors.New("not allowed to delete internal consumer")
} else if o := mset.LookupConsumer(ca.Name); o != nil {
if o := mset.LookupConsumer(ca.Name); o != nil {
err = o.stop(true, true, wasLeader)
} else {
resp.Error = jsNoConsumerErr