mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] RACE between sublist remove and go through match results
This would manifest for instance when server tries to send messages to queue subscribers and a subscription is unsubsribed at the same time. Resolves #640
This commit is contained in:
@@ -236,8 +236,13 @@ func (s *Sublist) Match(subject string) *SublistResult {
|
||||
s.Lock()
|
||||
matchLevel(s.root, tokens, result)
|
||||
|
||||
// Store in cache and return to caller a copy of the results to avoid
|
||||
// race when sub is removed from sublist and caller walks through the
|
||||
// results.
|
||||
cr := copyResult(result)
|
||||
|
||||
// Add to our cache
|
||||
s.cache[subject] = result
|
||||
s.cache[subject] = cr
|
||||
// Bound the number of entries to sublistMaxCache
|
||||
if len(s.cache) > slCacheMax {
|
||||
for k := range s.cache {
|
||||
@@ -247,7 +252,7 @@ func (s *Sublist) Match(subject string) *SublistResult {
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
return result
|
||||
return cr
|
||||
}
|
||||
|
||||
// This will add in a node's results to the total results.
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright 2012-2017 Apcera Inc. All rights reserved.
|
||||
// Copyright 2018 Synadia Communications Inc. All rights reserved.
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
@@ -509,6 +512,81 @@ func TestSublistRemoveWithWildcardsAsLiterals(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubListRaceOnRemove(t *testing.T) {
|
||||
s := NewSublist()
|
||||
|
||||
var (
|
||||
total = 100
|
||||
subs = make(map[int]*subscription, total) // use map for randomness
|
||||
)
|
||||
for i := 0; i < total; i++ {
|
||||
sub := newQSub("foo", "bar")
|
||||
subs[i] = sub
|
||||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
for _, sub := range subs {
|
||||
s.Insert(sub)
|
||||
}
|
||||
// Call Match() once or twice, to make sure we get from cache
|
||||
if i == 1 {
|
||||
s.Match("foo")
|
||||
}
|
||||
// This will be from cache when i==1
|
||||
r := s.Match("foo")
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for _, sub := range subs {
|
||||
s.Remove(sub)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
for _, qsub := range r.qsubs {
|
||||
for i := 0; i < len(qsub); i++ {
|
||||
sub := qsub[i]
|
||||
if string(sub.queue) != "bar" {
|
||||
t.Fatalf("Queue name should be bar, got %s", qsub[i].queue)
|
||||
}
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Repeat tests with regular subs
|
||||
for i := 0; i < total; i++ {
|
||||
sub := newSub("foo")
|
||||
subs[i] = sub
|
||||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
for _, sub := range subs {
|
||||
s.Insert(sub)
|
||||
}
|
||||
// Call Match() once or twice, to make sure we get from cache
|
||||
if i == 1 {
|
||||
s.Match("foo")
|
||||
}
|
||||
// This will be from cache when i==1
|
||||
r := s.Match("foo")
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for _, sub := range subs {
|
||||
s.Remove(sub)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
for i := 0; i < len(r.psubs); i++ {
|
||||
sub := r.psubs[i]
|
||||
if string(sub.subject) != "foo" {
|
||||
t.Fatalf("Subject should be foo, got %s", sub.subject)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
// -- Benchmarks Setup --
|
||||
|
||||
var subs []*subscription
|
||||
|
||||
Reference in New Issue
Block a user