From 6eeb9d2361f05845a045176f94e000507d6ea4a9 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 3 Feb 2021 20:12:06 -0700 Subject: [PATCH] Fixed some issues with JS consumers - Should check if there is interest at top of deleteNotActive timer callback. - Use RLock/RUnlock for no interest check - Remove some checks in cluster mode regarding internal stream/consumers Signed-off-by: Ivan Kozlovic --- server/consumer.go | 8 ++++++-- server/jetstream_cluster.go | 10 ++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 08455b9a..1e105c34 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -895,6 +895,10 @@ func (o *Consumer) updateDeliveryInterest(localInterest bool) { } func (o *Consumer) deleteNotActive() { + // Need to check again if there is not an interest now that the timer fires. + if !o.hasNoLocalInterest() { + return + } o.mu.RLock() if o.mset == nil { o.mu.RUnlock() @@ -2369,9 +2373,9 @@ func (o *Consumer) Active() bool { // hasNoLocalInterest return true if we have no local interest. func (o *Consumer) hasNoLocalInterest() bool { - o.mu.Lock() + o.mu.RLock() rr := o.acc.sl.Match(o.config.DeliverSubject) - o.mu.Unlock() + o.mu.RUnlock() return len(rr.psubs)+len(rr.qsubs) == 0 } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3130a298..bb6048c6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1852,11 +1852,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember, if err != nil { resp.Error = jsNotFoundError(err) } else if mset != nil { - if mset.Config().internal { - err = errors.New("not allowed to delete internal stream") - } else { - err = mset.stop(true, wasLeader) - } + err = mset.stop(true, wasLeader) } if sa.Group.node != nil { @@ -2096,9 +2092,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb if err != nil { resp.Error = jsNotFoundError(err) } else if mset != nil { - if mset.Config().internal { - err = errors.New("not allowed to delete internal consumer") - } else if o := mset.LookupConsumer(ca.Name); o != nil { + if o := mset.LookupConsumer(ca.Name); o != nil { err = o.stop(true, true, wasLeader) } else { resp.Error = jsNoConsumerErr