Fixed Sublist.RemoveBatch to remove subs present, even if one isn't

I have seen cases, maybe due to previous issue with configuration
reload that would miss subscriptions in the sublist because
of the sublist swap, where we would attempt to remove subscriptions
by batch but some were not present. I would have expected that
all present subscriptions would still be removed, even if the
call overall returned an error.
This is now fixed and a test has been added demonstrating that
even on error, we remove all subscriptions that were present.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2023-05-03 15:17:08 -06:00
parent 95e4f2dfe1
commit 7afe76caf8
2 changed files with 31 additions and 3 deletions

View File

@@ -820,9 +820,13 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error {
// Turn off our cache if enabled.
wasEnabled := s.cache != nil
s.cache = nil
// We will try to remove all subscriptions but will report the first that caused
// an error. In other words, we don't bail out at the first error which would
// possibly leave a bunch of subscriptions that could have been removed.
var err error
for _, sub := range subs {
if err := s.remove(sub, false, false); err != nil {
return err
if lerr := s.remove(sub, false, false); lerr != nil && err == nil {
err = lerr
}
}
// Turn caching back on here.
@@ -830,7 +834,7 @@ func (s *Sublist) RemoveBatch(subs []*subscription) error {
if wasEnabled {
s.cache = make(map[string]*SublistResult)
}
return nil
return err
}
// pruneNode is used to prune an empty node from the tree.

View File

@@ -370,6 +370,30 @@ func TestSublistNoCacheRemoveBatch(t *testing.T) {
}
}
func TestSublistRemoveBatchWithError(t *testing.T) {
s := NewSublistNoCache()
sub1 := newSub("foo")
sub2 := newSub("bar")
sub3 := newSub("baz")
s.Insert(sub1)
s.Insert(sub2)
s.Insert(sub3)
subNotPresent := newSub("not.inserted")
// Try to remove all subs, but include the sub that has not been inserted.
err := s.RemoveBatch([]*subscription{subNotPresent, sub1, sub3})
// We expect an error to be returned, but sub1,2 and 3 to have been removed.
require_Error(t, err, ErrNotFound)
// Make sure that we have only sub2 present
verifyCount(s, 1, t)
r := s.Match("bar")
verifyLen(r.psubs, 1, t)
verifyMember(r.psubs, sub2, t)
r = s.Match("foo")
verifyLen(r.psubs, 0, t)
r = s.Match("baz")
verifyLen(r.psubs, 0, t)
}
func testSublistInvalidSubjectsInsert(t *testing.T, s *Sublist) {
// Insert, or subscriptions, can have wildcards, but not empty tokens,
// and can not have a FWC that is not the terminal token.