diff --git a/server/sublist.go b/server/sublist.go index 65fd0024..fa0a4d0a 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1,4 +1,4 @@ -// Copyright 2016-2019 The NATS Authors +// Copyright 2016-2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -461,6 +461,7 @@ func (s *Sublist) removeFromCache(subject string, sub *subscription) { } return } + // Wildcard here. s.cache.Range(func(k, v interface{}) bool { key := k.(string) if matchLiteral(key, subject) { @@ -659,7 +660,7 @@ type lnt struct { } // Raw low level remove, can do batches with lock held outside. -func (s *Sublist) remove(sub *subscription, shouldLock bool) error { +func (s *Sublist) remove(sub *subscription, shouldLock bool, doCacheUpdates bool) error { subject := string(sub.subject) tsa := [32]string{} tokens := tsa[:0] @@ -728,8 +729,10 @@ func (s *Sublist) remove(sub *subscription, shouldLock bool) error { l.pruneNode(n, t) } } - s.removeFromCache(subject, sub) - atomic.AddUint64(&s.genid, 1) + if doCacheUpdates { + s.removeFromCache(subject, sub) + atomic.AddUint64(&s.genid, 1) + } if s.notify != nil && last && len(s.notify.remove) > 0 { s.chkForRemoveNotification(subject, !haswc) @@ -740,19 +743,33 @@ func (s *Sublist) remove(sub *subscription, shouldLock bool) error { // Remove will remove a subscription. func (s *Sublist) Remove(sub *subscription) error { - return s.remove(sub, true) + return s.remove(sub, true, true) } // RemoveBatch will remove a list of subscriptions. func (s *Sublist) RemoveBatch(subs []*subscription) error { + if len(subs) == 0 { + return nil + } + s.Lock() defer s.Unlock() + // TODO(dlc) - We could try to be smarter here for a client going away but the account + // has a large number of subscriptions compared to this client. Quick and dirty testing + // though said just disabling all the time best for now. + + // Turn off our cache. + s.cache = nil + atomic.StoreInt32(&s.cacheNum, 0) for _, sub := range subs { - if err := s.remove(sub, false); err != nil { + if err := s.remove(sub, false, false); err != nil { return err } } + // Turn caching back on here. + atomic.AddUint64(&s.genid, 1) + s.cache = &sync.Map{} return nil }