From 668229fc1d55362afc833ea7bbfc7edabc4c0316 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 9 Mar 2018 19:05:33 -0700 Subject: [PATCH] [FIXED] RACE between sublist remove and go through match results This would manifest for instance when server tries to send messages to queue subscribers and a subscription is unsubsribed at the same time. Resolves #640 --- server/sublist.go | 9 +++-- server/sublist_test.go | 78 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/server/sublist.go b/server/sublist.go index 4df3927e..62bfdb8b 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -236,8 +236,13 @@ func (s *Sublist) Match(subject string) *SublistResult { s.Lock() matchLevel(s.root, tokens, result) + // Store in cache and return to caller a copy of the results to avoid + // race when sub is removed from sublist and caller walks through the + // results. + cr := copyResult(result) + // Add to our cache - s.cache[subject] = result + s.cache[subject] = cr // Bound the number of entries to sublistMaxCache if len(s.cache) > slCacheMax { for k := range s.cache { @@ -247,7 +252,7 @@ func (s *Sublist) Match(subject string) *SublistResult { } s.Unlock() - return result + return cr } // This will add in a node's results to the total results. diff --git a/server/sublist_test.go b/server/sublist_test.go index a7df7b15..afad7a7d 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1,3 +1,6 @@ +// Copyright 2012-2017 Apcera Inc. All rights reserved. +// Copyright 2018 Synadia Communications Inc. All rights reserved. + package server import ( @@ -509,6 +512,81 @@ func TestSublistRemoveWithWildcardsAsLiterals(t *testing.T) { } } +func TestSubListRaceOnRemove(t *testing.T) { + s := NewSublist() + + var ( + total = 100 + subs = make(map[int]*subscription, total) // use map for randomness + ) + for i := 0; i < total; i++ { + sub := newQSub("foo", "bar") + subs[i] = sub + } + + for i := 0; i < 2; i++ { + for _, sub := range subs { + s.Insert(sub) + } + // Call Match() once or twice, to make sure we get from cache + if i == 1 { + s.Match("foo") + } + // This will be from cache when i==1 + r := s.Match("foo") + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for _, sub := range subs { + s.Remove(sub) + } + wg.Done() + }() + for _, qsub := range r.qsubs { + for i := 0; i < len(qsub); i++ { + sub := qsub[i] + if string(sub.queue) != "bar" { + t.Fatalf("Queue name should be bar, got %s", qsub[i].queue) + } + } + } + wg.Wait() + } + + // Repeat tests with regular subs + for i := 0; i < total; i++ { + sub := newSub("foo") + subs[i] = sub + } + + for i := 0; i < 2; i++ { + for _, sub := range subs { + s.Insert(sub) + } + // Call Match() once or twice, to make sure we get from cache + if i == 1 { + s.Match("foo") + } + // This will be from cache when i==1 + r := s.Match("foo") + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for _, sub := range subs { + s.Remove(sub) + } + wg.Done() + }() + for i := 0; i < len(r.psubs); i++ { + sub := r.psubs[i] + if string(sub.subject) != "foo" { + t.Fatalf("Subject should be foo, got %s", sub.subject) + } + } + wg.Wait() + } +} + // -- Benchmarks Setup -- var subs []*subscription