Improve RemoveBatch by disabling cache

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-05-03 12:04:53 -07:00
parent 9a702c2bc7
commit acc8da8b6e

View File

@@ -1,4 +1,4 @@
// Copyright 2016-2019 The NATS Authors
// Copyright 2016-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -461,6 +461,7 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) {
}
return
}
// Wildcard here.
s.cache.Range(func(k, v interface{}) bool {
key := k.(string)
if matchLiteral(key, subject) {
@@ -659,7 +660,7 @@ type lnt struct {
}
// Raw low level remove, can do batches with lock held outside.
func (s *Sublist) remove(sub *subscription, shouldLock bool) error {
func (s *Sublist) remove(sub *subscription, shouldLock bool, doCacheUpdates bool) error {
subject := string(sub.subject)
tsa := [32]string{}
tokens := tsa[:0]
@@ -728,8 +729,10 @@ func (s *Sublist) remove(sub *subscription, shouldLock bool) error {
l.pruneNode(n, t)
}
}
s.removeFromCache(subject, sub)
atomic.AddUint64(&s.genid, 1)
if doCacheUpdates {
s.removeFromCache(subject, sub)
atomic.AddUint64(&s.genid, 1)
}
if s.notify != nil && last && len(s.notify.remove) > 0 {
s.chkForRemoveNotification(subject, !haswc)
@@ -740,19 +743,33 @@ func (s *Sublist) remove(sub *subscription, shouldLock bool) error {
// Remove will remove a subscription.
func (s *Sublist) Remove(sub *subscription) error {
return s.remove(sub, true)
return s.remove(sub, true, true)
}
// RemoveBatch will remove a list of subscriptions.
func (s *Sublist) RemoveBatch(subs []*subscription) error {
if len(subs) == 0 {
return nil
}
s.Lock()
defer s.Unlock()
// TODO(dlc) - We could try to be smarter here for a client going away but the account
// has a large number of subscriptions compared to this client. Quick and dirty testing
// though said just disabling all the time best for now.
// Turn off our cache.
s.cache = nil
atomic.StoreInt32(&s.cacheNum, 0)
for _, sub := range subs {
if err := s.remove(sub, false); err != nil {
if err := s.remove(sub, false, false); err != nil {
return err
}
}
// Turn caching back on here.
atomic.AddUint64(&s.genid, 1)
s.cache = &sync.Map{}
return nil
}