mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
1391 lines
32 KiB
Go
1391 lines
32 KiB
Go
// Copyright 2016-2020 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// Sublist is a routing mechanism to handle subject distribution and
|
|
// provides a facility to match subjects from published messages to
|
|
// interested subscribers. Subscribers can have wildcard subjects to
|
|
// match multiple published subjects.
|
|
|
|
// Common byte variables for wildcards and token separator.
|
|
const (
|
|
pwc = '*'
|
|
fwc = '>'
|
|
tsep = "."
|
|
btsep = '.'
|
|
)
|
|
|
|
// Sublist related errors
|
|
var (
|
|
ErrInvalidSubject = errors.New("sublist: invalid subject")
|
|
ErrNotFound = errors.New("sublist: no matches found")
|
|
ErrNilChan = errors.New("sublist: nil channel")
|
|
ErrAlreadyRegistered = errors.New("sublist: notification already registered")
|
|
)
|
|
|
|
const (
|
|
// cacheMax is used to bound limit the frontend cache
|
|
slCacheMax = 1024
|
|
// If we run a sweeper we will drain to this count.
|
|
slCacheSweep = 512
|
|
// plistMin is our lower bounds to create a fast plist for Match.
|
|
plistMin = 256
|
|
)
|
|
|
|
// SublistResult is a result structure better optimized for queue subs.
|
|
type SublistResult struct {
|
|
psubs []*subscription
|
|
qsubs [][]*subscription // don't make this a map, too expensive to iterate
|
|
}
|
|
|
|
// A Sublist stores and efficiently retrieves subscriptions.
|
|
type Sublist struct {
|
|
sync.RWMutex
|
|
genid uint64
|
|
matches uint64
|
|
cacheHits uint64
|
|
inserts uint64
|
|
removes uint64
|
|
root *level
|
|
cache map[string]*SublistResult
|
|
ccSweep int32
|
|
notify *notifyMaps
|
|
count uint32
|
|
}
|
|
|
|
// notifyMaps holds maps of arrays of channels for notifications
|
|
// on a change of interest.
|
|
type notifyMaps struct {
|
|
insert map[string][]chan<- bool
|
|
remove map[string][]chan<- bool
|
|
}
|
|
|
|
// A node contains subscriptions and a pointer to the next level.
|
|
type node struct {
|
|
next *level
|
|
psubs map[*subscription]*subscription
|
|
qsubs map[string](map[*subscription]*subscription)
|
|
plist []*subscription
|
|
}
|
|
|
|
// A level represents a group of nodes and special pointers to
|
|
// wildcard nodes.
|
|
type level struct {
|
|
nodes map[string]*node
|
|
pwc, fwc *node
|
|
}
|
|
|
|
// Create a new default node.
|
|
func newNode() *node {
|
|
return &node{psubs: make(map[*subscription]*subscription)}
|
|
}
|
|
|
|
// Create a new default level.
|
|
func newLevel() *level {
|
|
return &level{nodes: make(map[string]*node)}
|
|
}
|
|
|
|
// In general caching is recommended however in some extreme cases where
|
|
// interest changes are high, suppressing the cache can help.
|
|
// https://github.com/nats-io/nats-server/issues/941
|
|
// FIXME(dlc) - should be more dynamic at some point based on cache thrashing.
|
|
|
|
// NewSublist will create a default sublist with caching enabled per the flag.
|
|
func NewSublist(enableCache bool) *Sublist {
|
|
if enableCache {
|
|
return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)}
|
|
}
|
|
return &Sublist{root: newLevel()}
|
|
}
|
|
|
|
// NewSublistWithCache will create a default sublist with caching enabled.
|
|
func NewSublistWithCache() *Sublist {
|
|
return NewSublist(true)
|
|
}
|
|
|
|
// NewSublistNoCache will create a default sublist with caching disabled.
|
|
func NewSublistNoCache() *Sublist {
|
|
return NewSublist(false)
|
|
}
|
|
|
|
// CacheEnabled returns whether or not caching is enabled for this sublist.
|
|
func (s *Sublist) CacheEnabled() bool {
|
|
s.RLock()
|
|
enabled := s.cache != nil
|
|
s.RUnlock()
|
|
return enabled
|
|
}
|
|
|
|
// RegisterNotification will register for notifications when interest for the given
|
|
// subject changes. The subject must be a literal publish type subject. The
|
|
// notification is true for when the first interest for a subject is inserted,
|
|
// and false when all interest in the subject is removed. Note that this interest
|
|
// needs to be exact and that wildcards will not trigger the notifications. The sublist
|
|
// will not block when trying to send the notification. Its up to the caller to make
|
|
// sure the channel send will not block.
|
|
func (s *Sublist) RegisterNotification(subject string, notify chan<- bool) error {
|
|
if subjectHasWildcard(subject) {
|
|
return ErrInvalidSubject
|
|
}
|
|
if notify == nil {
|
|
return ErrNilChan
|
|
}
|
|
|
|
var hasInterest bool
|
|
r := s.Match(subject)
|
|
|
|
if len(r.psubs)+len(r.qsubs) > 0 {
|
|
for _, sub := range r.psubs {
|
|
if string(sub.subject) == subject {
|
|
hasInterest = true
|
|
break
|
|
}
|
|
}
|
|
for _, qsub := range r.qsubs {
|
|
qs := qsub[0]
|
|
if string(qs.subject) == subject {
|
|
hasInterest = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
s.Lock()
|
|
if s.notify == nil {
|
|
s.notify = ¬ifyMaps{
|
|
insert: make(map[string][]chan<- bool),
|
|
remove: make(map[string][]chan<- bool),
|
|
}
|
|
}
|
|
|
|
var err error
|
|
if hasInterest {
|
|
err = s.addRemoveNotify(subject, notify)
|
|
} else {
|
|
err = s.addInsertNotify(subject, notify)
|
|
}
|
|
s.Unlock()
|
|
if err == nil {
|
|
sendNotification(notify, hasInterest)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Lock should be held.
|
|
func chkAndRemove(subject string, notify chan<- bool, ms map[string][]chan<- bool) bool {
|
|
chs := ms[subject]
|
|
for i, ch := range chs {
|
|
if ch == notify {
|
|
chs[i] = chs[len(chs)-1]
|
|
chs = chs[:len(chs)-1]
|
|
if len(chs) == 0 {
|
|
delete(ms, subject)
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *Sublist) ClearNotification(subject string, notify chan<- bool) bool {
|
|
s.Lock()
|
|
if s.notify == nil {
|
|
s.Unlock()
|
|
return false
|
|
}
|
|
// Check both, start with remove.
|
|
didRemove := chkAndRemove(subject, notify, s.notify.remove)
|
|
didRemove = didRemove || chkAndRemove(subject, notify, s.notify.insert)
|
|
// Check if everything is gone
|
|
if len(s.notify.remove)+len(s.notify.insert) == 0 {
|
|
s.notify = nil
|
|
}
|
|
s.Unlock()
|
|
return didRemove
|
|
}
|
|
|
|
func sendNotification(ch chan<- bool, hasInterest bool) {
|
|
select {
|
|
case ch <- hasInterest:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Add a new channel for notification in insert map.
|
|
// Write lock should be held.
|
|
func (s *Sublist) addInsertNotify(subject string, notify chan<- bool) error {
|
|
return s.addNotify(s.notify.insert, subject, notify)
|
|
}
|
|
|
|
// Add a new channel for notification in removal map.
|
|
// Write lock should be held.
|
|
func (s *Sublist) addRemoveNotify(subject string, notify chan<- bool) error {
|
|
return s.addNotify(s.notify.remove, subject, notify)
|
|
}
|
|
|
|
// Add a new channel for notification.
|
|
// Write lock should be held.
|
|
func (s *Sublist) addNotify(m map[string][]chan<- bool, subject string, notify chan<- bool) error {
|
|
chs := m[subject]
|
|
if len(chs) > 0 {
|
|
// Check to see if this chan is alredy registered.
|
|
for _, ch := range chs {
|
|
if ch == notify {
|
|
return ErrAlreadyRegistered
|
|
}
|
|
}
|
|
}
|
|
m[subject] = append(chs, notify)
|
|
return nil
|
|
}
|
|
|
|
// chkForInsertNotification will check to see if we need to notify on this subject.
|
|
// Write lock should be held.
|
|
func (s *Sublist) chkForInsertNotification(subject string) {
|
|
// All notify subjects are also literal so just do a hash lookup here.
|
|
if chs := s.notify.insert[subject]; len(chs) > 0 {
|
|
for _, ch := range chs {
|
|
sendNotification(ch, true)
|
|
}
|
|
// Move from the insert map to the remove map.
|
|
s.notify.remove[subject] = append(s.notify.remove[subject], chs...)
|
|
delete(s.notify.insert, subject)
|
|
}
|
|
}
|
|
|
|
// chkForRemoveNotification will check to see if we need to notify on this subject.
|
|
// Write lock should be held.
|
|
func (s *Sublist) chkForRemoveNotification(subject string) {
|
|
if chs := s.notify.remove[subject]; len(chs) > 0 {
|
|
// We need to always check that we have no interest anymore.
|
|
var hasInterest bool
|
|
r := s.matchNoLock(subject)
|
|
|
|
if len(r.psubs)+len(r.qsubs) > 0 {
|
|
for _, sub := range r.psubs {
|
|
if string(sub.subject) == subject {
|
|
hasInterest = true
|
|
break
|
|
}
|
|
}
|
|
for _, qsub := range r.qsubs {
|
|
qs := qsub[0]
|
|
if string(qs.subject) == subject {
|
|
hasInterest = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if !hasInterest {
|
|
for _, ch := range chs {
|
|
sendNotification(ch, false)
|
|
}
|
|
// Move from the remove map to the insert map.
|
|
s.notify.insert[subject] = append(s.notify.insert[subject], chs...)
|
|
delete(s.notify.remove, subject)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Insert adds a subscription into the sublist
|
|
func (s *Sublist) Insert(sub *subscription) error {
|
|
// copy the subject since we hold this and this might be part of a large byte slice.
|
|
subject := string(sub.subject)
|
|
tsa := [32]string{}
|
|
tokens := tsa[:0]
|
|
start := 0
|
|
for i := 0; i < len(subject); i++ {
|
|
if subject[i] == btsep {
|
|
tokens = append(tokens, subject[start:i])
|
|
start = i + 1
|
|
}
|
|
}
|
|
tokens = append(tokens, subject[start:])
|
|
|
|
s.Lock()
|
|
|
|
var sfwc, haswc, isnew bool
|
|
var n *node
|
|
l := s.root
|
|
|
|
for _, t := range tokens {
|
|
lt := len(t)
|
|
if lt == 0 || sfwc {
|
|
s.Unlock()
|
|
return ErrInvalidSubject
|
|
}
|
|
|
|
if lt > 1 {
|
|
n = l.nodes[t]
|
|
} else {
|
|
switch t[0] {
|
|
case pwc:
|
|
n = l.pwc
|
|
haswc = true
|
|
case fwc:
|
|
n = l.fwc
|
|
haswc, sfwc = true, true
|
|
default:
|
|
n = l.nodes[t]
|
|
}
|
|
}
|
|
if n == nil {
|
|
isnew = true
|
|
n = newNode()
|
|
if lt > 1 {
|
|
l.nodes[t] = n
|
|
} else {
|
|
switch t[0] {
|
|
case pwc:
|
|
l.pwc = n
|
|
case fwc:
|
|
l.fwc = n
|
|
default:
|
|
l.nodes[t] = n
|
|
}
|
|
}
|
|
}
|
|
if n.next == nil {
|
|
n.next = newLevel()
|
|
}
|
|
l = n.next
|
|
}
|
|
if sub.queue == nil {
|
|
n.psubs[sub] = sub
|
|
if n.plist != nil {
|
|
n.plist = append(n.plist, sub)
|
|
} else if len(n.psubs) > plistMin {
|
|
n.plist = make([]*subscription, 0, len(n.psubs))
|
|
// Populate
|
|
for _, psub := range n.psubs {
|
|
n.plist = append(n.plist, psub)
|
|
}
|
|
}
|
|
} else {
|
|
if n.qsubs == nil {
|
|
n.qsubs = make(map[string]map[*subscription]*subscription)
|
|
}
|
|
qname := string(sub.queue)
|
|
// This is a queue subscription
|
|
subs, ok := n.qsubs[qname]
|
|
if !ok {
|
|
subs = make(map[*subscription]*subscription)
|
|
n.qsubs[qname] = subs
|
|
}
|
|
subs[sub] = sub
|
|
}
|
|
|
|
s.count++
|
|
s.inserts++
|
|
|
|
s.addToCache(subject, sub)
|
|
atomic.AddUint64(&s.genid, 1)
|
|
|
|
if s.notify != nil && isnew && !haswc && len(s.notify.insert) > 0 {
|
|
s.chkForInsertNotification(subject)
|
|
}
|
|
s.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Deep copy
|
|
func copyResult(r *SublistResult) *SublistResult {
|
|
nr := &SublistResult{}
|
|
nr.psubs = append([]*subscription(nil), r.psubs...)
|
|
for _, qr := range r.qsubs {
|
|
nqr := append([]*subscription(nil), qr...)
|
|
nr.qsubs = append(nr.qsubs, nqr)
|
|
}
|
|
return nr
|
|
}
|
|
|
|
// Adds a new sub to an existing result.
|
|
func (r *SublistResult) addSubToResult(sub *subscription) *SublistResult {
|
|
// Copy since others may have a reference.
|
|
nr := copyResult(r)
|
|
if sub.queue == nil {
|
|
nr.psubs = append(nr.psubs, sub)
|
|
} else {
|
|
if i := findQSlot(sub.queue, nr.qsubs); i >= 0 {
|
|
nr.qsubs[i] = append(nr.qsubs[i], sub)
|
|
} else {
|
|
nr.qsubs = append(nr.qsubs, []*subscription{sub})
|
|
}
|
|
}
|
|
return nr
|
|
}
|
|
|
|
// addToCache will add the new entry to the existing cache
|
|
// entries if needed. Assumes write lock is held.
|
|
// Assumes write lock is held.
|
|
func (s *Sublist) addToCache(subject string, sub *subscription) {
|
|
if s.cache == nil {
|
|
return
|
|
}
|
|
// If literal we can direct match.
|
|
if subjectIsLiteral(subject) {
|
|
if r := s.cache[subject]; r != nil {
|
|
s.cache[subject] = r.addSubToResult(sub)
|
|
}
|
|
return
|
|
}
|
|
for key, r := range s.cache {
|
|
if matchLiteral(key, subject) {
|
|
s.cache[key] = r.addSubToResult(sub)
|
|
}
|
|
}
|
|
}
|
|
|
|
// removeFromCache will remove the sub from any active cache entries.
|
|
// Assumes write lock is held.
|
|
func (s *Sublist) removeFromCache(subject string, sub *subscription) {
|
|
if s.cache == nil {
|
|
return
|
|
}
|
|
// If literal we can direct match.
|
|
if subjectIsLiteral(subject) {
|
|
delete(s.cache, subject)
|
|
return
|
|
}
|
|
// Wildcard here.
|
|
for key := range s.cache {
|
|
if matchLiteral(key, subject) {
|
|
delete(s.cache, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
// a place holder for an empty result.
|
|
var emptyResult = &SublistResult{}
|
|
|
|
// Match will match all entries to the literal subject.
|
|
// It will return a set of results for both normal and queue subscribers.
|
|
func (s *Sublist) Match(subject string) *SublistResult {
|
|
return s.match(subject, true)
|
|
}
|
|
|
|
func (s *Sublist) matchNoLock(subject string) *SublistResult {
|
|
return s.match(subject, false)
|
|
}
|
|
|
|
func (s *Sublist) match(subject string, doLock bool) *SublistResult {
|
|
atomic.AddUint64(&s.matches, 1)
|
|
|
|
// Check cache first.
|
|
if doLock {
|
|
s.RLock()
|
|
}
|
|
r, ok := s.cache[subject]
|
|
if doLock {
|
|
s.RUnlock()
|
|
}
|
|
if ok {
|
|
atomic.AddUint64(&s.cacheHits, 1)
|
|
return r
|
|
}
|
|
|
|
tsa := [32]string{}
|
|
tokens := tsa[:0]
|
|
start := 0
|
|
for i := 0; i < len(subject); i++ {
|
|
if subject[i] == btsep {
|
|
tokens = append(tokens, subject[start:i])
|
|
start = i + 1
|
|
}
|
|
}
|
|
tokens = append(tokens, subject[start:])
|
|
|
|
// FIXME(dlc) - Make shared pool between sublist and client readLoop?
|
|
result := &SublistResult{}
|
|
|
|
// Get result from the main structure and place into the shared cache.
|
|
// Hold the read lock to avoid race between match and store.
|
|
var n int
|
|
|
|
if doLock {
|
|
s.Lock()
|
|
}
|
|
|
|
matchLevel(s.root, tokens, result)
|
|
// Check for empty result.
|
|
if len(result.psubs) == 0 && len(result.qsubs) == 0 {
|
|
result = emptyResult
|
|
}
|
|
if s.cache != nil {
|
|
s.cache[subject] = result
|
|
n = len(s.cache)
|
|
}
|
|
if doLock {
|
|
s.Unlock()
|
|
}
|
|
|
|
// Reduce the cache count if we have exceeded our set maximum.
|
|
if n > slCacheMax && atomic.CompareAndSwapInt32(&s.ccSweep, 0, 1) {
|
|
go s.reduceCacheCount()
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// Remove entries in the cache until we are under the maximum.
|
|
// TODO(dlc) this could be smarter now that its not inline.
|
|
func (s *Sublist) reduceCacheCount() {
|
|
defer atomic.StoreInt32(&s.ccSweep, 0)
|
|
// If we are over the cache limit randomly drop until under the limit.
|
|
s.Lock()
|
|
for key := range s.cache {
|
|
delete(s.cache, key)
|
|
if len(s.cache) <= slCacheSweep {
|
|
break
|
|
}
|
|
}
|
|
s.Unlock()
|
|
}
|
|
|
|
// Helper function for auto-expanding remote qsubs.
|
|
func isRemoteQSub(sub *subscription) bool {
|
|
return sub != nil && sub.queue != nil && sub.client != nil && sub.client.kind == ROUTER
|
|
}
|
|
|
|
// UpdateRemoteQSub should be called when we update the weight of an existing
|
|
// remote queue sub.
|
|
func (s *Sublist) UpdateRemoteQSub(sub *subscription) {
|
|
// We could search to make sure we find it, but probably not worth
|
|
// it unless we are thrashing the cache. Just remove from our L2 and update
|
|
// the genid so L1 will be flushed.
|
|
s.Lock()
|
|
s.removeFromCache(string(sub.subject), sub)
|
|
atomic.AddUint64(&s.genid, 1)
|
|
s.Unlock()
|
|
}
|
|
|
|
// This will add in a node's results to the total results.
|
|
func addNodeToResults(n *node, results *SublistResult) {
|
|
// Normal subscriptions
|
|
if n.plist != nil {
|
|
results.psubs = append(results.psubs, n.plist...)
|
|
} else {
|
|
for _, psub := range n.psubs {
|
|
results.psubs = append(results.psubs, psub)
|
|
}
|
|
}
|
|
// Queue subscriptions
|
|
for qname, qr := range n.qsubs {
|
|
if len(qr) == 0 {
|
|
continue
|
|
}
|
|
// Need to find matching list in results
|
|
var i int
|
|
if i = findQSlot([]byte(qname), results.qsubs); i < 0 {
|
|
i = len(results.qsubs)
|
|
nqsub := make([]*subscription, 0, len(qr))
|
|
results.qsubs = append(results.qsubs, nqsub)
|
|
}
|
|
for _, sub := range qr {
|
|
if isRemoteQSub(sub) {
|
|
ns := atomic.LoadInt32(&sub.qw)
|
|
// Shadow these subscriptions
|
|
for n := 0; n < int(ns); n++ {
|
|
results.qsubs[i] = append(results.qsubs[i], sub)
|
|
}
|
|
} else {
|
|
results.qsubs[i] = append(results.qsubs[i], sub)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// We do not use a map here since we want iteration to be past when
|
|
// processing publishes in L1 on client. So we need to walk sequentially
|
|
// for now. Keep an eye on this in case we start getting large number of
|
|
// different queue subscribers for the same subject.
|
|
func findQSlot(queue []byte, qsl [][]*subscription) int {
|
|
if queue == nil {
|
|
return -1
|
|
}
|
|
for i, qr := range qsl {
|
|
if len(qr) > 0 && bytes.Equal(queue, qr[0].queue) {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// matchLevel is used to recursively descend into the trie.
|
|
func matchLevel(l *level, toks []string, results *SublistResult) {
|
|
var pwc, n *node
|
|
for i, t := range toks {
|
|
if l == nil {
|
|
return
|
|
}
|
|
if l.fwc != nil {
|
|
addNodeToResults(l.fwc, results)
|
|
}
|
|
if pwc = l.pwc; pwc != nil {
|
|
matchLevel(pwc.next, toks[i+1:], results)
|
|
}
|
|
n = l.nodes[t]
|
|
if n != nil {
|
|
l = n.next
|
|
} else {
|
|
l = nil
|
|
}
|
|
}
|
|
if n != nil {
|
|
addNodeToResults(n, results)
|
|
}
|
|
if pwc != nil {
|
|
addNodeToResults(pwc, results)
|
|
}
|
|
}
|
|
|
|
// lnt is used to track descent into levels for a removal for pruning.
|
|
type lnt struct {
|
|
l *level
|
|
n *node
|
|
t string
|
|
}
|
|
|
|
// Raw low level remove, can do batches with lock held outside.
|
|
func (s *Sublist) remove(sub *subscription, shouldLock bool, doCacheUpdates bool) error {
|
|
subject := string(sub.subject)
|
|
tsa := [32]string{}
|
|
tokens := tsa[:0]
|
|
start := 0
|
|
for i := 0; i < len(subject); i++ {
|
|
if subject[i] == btsep {
|
|
tokens = append(tokens, subject[start:i])
|
|
start = i + 1
|
|
}
|
|
}
|
|
tokens = append(tokens, subject[start:])
|
|
|
|
if shouldLock {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
}
|
|
|
|
var sfwc, haswc, last bool
|
|
var n *node
|
|
l := s.root
|
|
|
|
// Track levels for pruning
|
|
var lnts [32]lnt
|
|
levels := lnts[:0]
|
|
|
|
for _, t := range tokens {
|
|
lt := len(t)
|
|
if lt == 0 || sfwc {
|
|
return ErrInvalidSubject
|
|
}
|
|
if l == nil {
|
|
return ErrNotFound
|
|
}
|
|
if lt > 1 {
|
|
n = l.nodes[t]
|
|
} else {
|
|
switch t[0] {
|
|
case pwc:
|
|
n = l.pwc
|
|
haswc = true
|
|
case fwc:
|
|
n = l.fwc
|
|
haswc, sfwc = true, true
|
|
default:
|
|
n = l.nodes[t]
|
|
}
|
|
}
|
|
if n != nil {
|
|
levels = append(levels, lnt{l, n, t})
|
|
l = n.next
|
|
} else {
|
|
l = nil
|
|
}
|
|
}
|
|
if !s.removeFromNode(n, sub) {
|
|
return ErrNotFound
|
|
}
|
|
|
|
s.count--
|
|
s.removes++
|
|
|
|
for i := len(levels) - 1; i >= 0; i-- {
|
|
l, n, t := levels[i].l, levels[i].n, levels[i].t
|
|
if n.isEmpty() {
|
|
last = true
|
|
l.pruneNode(n, t)
|
|
}
|
|
}
|
|
if doCacheUpdates {
|
|
s.removeFromCache(subject, sub)
|
|
atomic.AddUint64(&s.genid, 1)
|
|
}
|
|
|
|
if s.notify != nil && last && !haswc && len(s.notify.remove) > 0 {
|
|
s.chkForRemoveNotification(subject)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Remove will remove a subscription.
|
|
func (s *Sublist) Remove(sub *subscription) error {
|
|
return s.remove(sub, true, true)
|
|
}
|
|
|
|
// RemoveBatch will remove a list of subscriptions.
|
|
func (s *Sublist) RemoveBatch(subs []*subscription) error {
|
|
if len(subs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
// TODO(dlc) - We could try to be smarter here for a client going away but the account
|
|
// has a large number of subscriptions compared to this client. Quick and dirty testing
|
|
// though said just disabling all the time best for now.
|
|
|
|
// Turn off our cache if enabled.
|
|
wasEnabled := s.cache != nil
|
|
s.cache = nil
|
|
for _, sub := range subs {
|
|
if err := s.remove(sub, false, false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// Turn caching back on here.
|
|
atomic.AddUint64(&s.genid, 1)
|
|
if wasEnabled {
|
|
s.cache = make(map[string]*SublistResult)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// pruneNode is used to prune an empty node from the tree.
|
|
func (l *level) pruneNode(n *node, t string) {
|
|
if n == nil {
|
|
return
|
|
}
|
|
if n == l.fwc {
|
|
l.fwc = nil
|
|
} else if n == l.pwc {
|
|
l.pwc = nil
|
|
} else {
|
|
delete(l.nodes, t)
|
|
}
|
|
}
|
|
|
|
// isEmpty will test if the node has any entries. Used
|
|
// in pruning.
|
|
func (n *node) isEmpty() bool {
|
|
if len(n.psubs) == 0 && len(n.qsubs) == 0 {
|
|
if n.next == nil || n.next.numNodes() == 0 {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Return the number of nodes for the given level.
|
|
func (l *level) numNodes() int {
|
|
num := len(l.nodes)
|
|
if l.pwc != nil {
|
|
num++
|
|
}
|
|
if l.fwc != nil {
|
|
num++
|
|
}
|
|
return num
|
|
}
|
|
|
|
// Remove the sub for the given node.
|
|
func (s *Sublist) removeFromNode(n *node, sub *subscription) (found bool) {
|
|
if n == nil {
|
|
return false
|
|
}
|
|
if sub.queue == nil {
|
|
_, found = n.psubs[sub]
|
|
delete(n.psubs, sub)
|
|
if found && n.plist != nil {
|
|
// This will brute force remove the plist to perform
|
|
// correct behavior. Will get re-populated on a call
|
|
// to Match as needed.
|
|
n.plist = nil
|
|
}
|
|
return found
|
|
}
|
|
|
|
// We have a queue group subscription here
|
|
qsub := n.qsubs[string(sub.queue)]
|
|
_, found = qsub[sub]
|
|
delete(qsub, sub)
|
|
if len(qsub) == 0 {
|
|
delete(n.qsubs, string(sub.queue))
|
|
}
|
|
return found
|
|
}
|
|
|
|
// Count returns the number of subscriptions.
|
|
func (s *Sublist) Count() uint32 {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
return s.count
|
|
}
|
|
|
|
// CacheCount returns the number of result sets in the cache.
|
|
func (s *Sublist) CacheCount() int {
|
|
s.RLock()
|
|
cc := len(s.cache)
|
|
s.RUnlock()
|
|
return cc
|
|
}
|
|
|
|
// SublistStats are public stats for the sublist
|
|
type SublistStats struct {
|
|
NumSubs uint32 `json:"num_subscriptions"`
|
|
NumCache uint32 `json:"num_cache"`
|
|
NumInserts uint64 `json:"num_inserts"`
|
|
NumRemoves uint64 `json:"num_removes"`
|
|
NumMatches uint64 `json:"num_matches"`
|
|
CacheHitRate float64 `json:"cache_hit_rate"`
|
|
MaxFanout uint32 `json:"max_fanout"`
|
|
AvgFanout float64 `json:"avg_fanout"`
|
|
totFanout int
|
|
cacheCnt int
|
|
}
|
|
|
|
func (s *SublistStats) add(stat *SublistStats) {
|
|
s.NumSubs += stat.NumSubs
|
|
s.NumCache += stat.NumCache
|
|
s.NumInserts += stat.NumInserts
|
|
s.NumRemoves += stat.NumRemoves
|
|
s.NumMatches += stat.NumMatches
|
|
s.CacheHitRate += stat.CacheHitRate
|
|
if s.MaxFanout < stat.MaxFanout {
|
|
s.MaxFanout = stat.MaxFanout
|
|
}
|
|
|
|
// ignore slStats.AvgFanout, collect the values
|
|
// it's based on instead
|
|
s.totFanout += stat.totFanout
|
|
s.cacheCnt += stat.cacheCnt
|
|
if s.totFanout > 0 {
|
|
s.AvgFanout = float64(s.totFanout) / float64(s.cacheCnt)
|
|
}
|
|
}
|
|
|
|
// Stats will return a stats structure for the current state.
|
|
func (s *Sublist) Stats() *SublistStats {
|
|
st := &SublistStats{}
|
|
|
|
s.RLock()
|
|
cache := s.cache
|
|
cc := len(s.cache)
|
|
st.NumSubs = s.count
|
|
st.NumInserts = s.inserts
|
|
st.NumRemoves = s.removes
|
|
s.RUnlock()
|
|
|
|
st.NumCache = uint32(cc)
|
|
st.NumMatches = atomic.LoadUint64(&s.matches)
|
|
if st.NumMatches > 0 {
|
|
st.CacheHitRate = float64(atomic.LoadUint64(&s.cacheHits)) / float64(st.NumMatches)
|
|
}
|
|
|
|
// whip through cache for fanout stats, this can be off if cache is full and doing evictions.
|
|
// If this is called frequently, which it should not be, this could hurt performance.
|
|
if cache != nil {
|
|
tot, max, clen := 0, 0, 0
|
|
s.RLock()
|
|
for _, r := range s.cache {
|
|
clen++
|
|
l := len(r.psubs) + len(r.qsubs)
|
|
tot += l
|
|
if l > max {
|
|
max = l
|
|
}
|
|
}
|
|
s.RUnlock()
|
|
st.totFanout = tot
|
|
st.cacheCnt = clen
|
|
st.MaxFanout = uint32(max)
|
|
if tot > 0 {
|
|
st.AvgFanout = float64(tot) / float64(clen)
|
|
}
|
|
}
|
|
return st
|
|
}
|
|
|
|
// numLevels will return the maximum number of levels
|
|
// contained in the Sublist tree.
|
|
func (s *Sublist) numLevels() int {
|
|
return visitLevel(s.root, 0)
|
|
}
|
|
|
|
// visitLevel is used to descend the Sublist tree structure
|
|
// recursively.
|
|
func visitLevel(l *level, depth int) int {
|
|
if l == nil || l.numNodes() == 0 {
|
|
return depth
|
|
}
|
|
|
|
depth++
|
|
maxDepth := depth
|
|
|
|
for _, n := range l.nodes {
|
|
if n == nil {
|
|
continue
|
|
}
|
|
newDepth := visitLevel(n.next, depth)
|
|
if newDepth > maxDepth {
|
|
maxDepth = newDepth
|
|
}
|
|
}
|
|
if l.pwc != nil {
|
|
pwcDepth := visitLevel(l.pwc.next, depth)
|
|
if pwcDepth > maxDepth {
|
|
maxDepth = pwcDepth
|
|
}
|
|
}
|
|
if l.fwc != nil {
|
|
fwcDepth := visitLevel(l.fwc.next, depth)
|
|
if fwcDepth > maxDepth {
|
|
maxDepth = fwcDepth
|
|
}
|
|
}
|
|
return maxDepth
|
|
}
|
|
|
|
// Determine if a subject has any wildcard tokens.
|
|
func subjectHasWildcard(subject string) bool {
|
|
return !subjectIsLiteral(subject)
|
|
}
|
|
|
|
// Determine if the subject has any wildcards. Fast version, does not check for
|
|
// valid subject. Used in caching layer.
|
|
func subjectIsLiteral(subject string) bool {
|
|
for i, c := range subject {
|
|
if c == pwc || c == fwc {
|
|
if (i == 0 || subject[i-1] == btsep) &&
|
|
(i+1 == len(subject) || subject[i+1] == btsep) {
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// IsValidPublishSubject returns true if a subject is valid and a literal, false otherwise
|
|
func IsValidPublishSubject(subject string) bool {
|
|
return IsValidSubject(subject) && subjectIsLiteral(subject)
|
|
}
|
|
|
|
// IsValidSubject returns true if a subject is valid, false otherwise
|
|
func IsValidSubject(subject string) bool {
|
|
if subject == "" {
|
|
return false
|
|
}
|
|
sfwc := false
|
|
tokens := strings.Split(subject, tsep)
|
|
for _, t := range tokens {
|
|
if len(t) == 0 || sfwc {
|
|
return false
|
|
}
|
|
if len(t) > 1 {
|
|
continue
|
|
}
|
|
switch t[0] {
|
|
case fwc:
|
|
sfwc = true
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Will share relevant info regarding the subject.
|
|
// Returns valid, tokens, num pwcs, has fwc.
|
|
func subjectInfo(subject string) (bool, []string, int, bool) {
|
|
if subject == "" {
|
|
return false, nil, 0, false
|
|
}
|
|
npwcs := 0
|
|
sfwc := false
|
|
tokens := strings.Split(subject, tsep)
|
|
for _, t := range tokens {
|
|
if len(t) == 0 || sfwc {
|
|
return false, nil, 0, false
|
|
}
|
|
if len(t) > 1 {
|
|
continue
|
|
}
|
|
switch t[0] {
|
|
case fwc:
|
|
sfwc = true
|
|
case pwc:
|
|
npwcs++
|
|
}
|
|
}
|
|
return true, tokens, npwcs, sfwc
|
|
}
|
|
|
|
// IsValidLiteralSubject returns true if a subject is valid and literal (no wildcards), false otherwise
|
|
func IsValidLiteralSubject(subject string) bool {
|
|
return isValidLiteralSubject(strings.Split(subject, tsep))
|
|
}
|
|
|
|
// isValidLiteralSubject returns true if the tokens are valid and literal (no wildcards), false otherwise
|
|
func isValidLiteralSubject(tokens []string) bool {
|
|
for _, t := range tokens {
|
|
if len(t) == 0 {
|
|
return false
|
|
}
|
|
if len(t) > 1 {
|
|
continue
|
|
}
|
|
switch t[0] {
|
|
case pwc, fwc:
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Will check tokens and report back if the have any partial or full wildcards.
|
|
func analyzeTokens(tokens []string) (hasPWC, hasFWC bool) {
|
|
for _, t := range tokens {
|
|
if lt := len(t); lt == 0 || lt > 1 {
|
|
continue
|
|
}
|
|
switch t[0] {
|
|
case pwc:
|
|
hasPWC = true
|
|
case fwc:
|
|
hasFWC = true
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Check on a token basis if they could match.
|
|
func tokensCanMatch(t1, t2 string) bool {
|
|
if len(t1) == 0 || len(t2) == 0 {
|
|
return false
|
|
}
|
|
t1c, t2c := t1[0], t2[0]
|
|
if t1c == pwc || t2c == pwc || t1c == fwc || t2c == fwc {
|
|
return true
|
|
}
|
|
return t1 == t2
|
|
}
|
|
|
|
// SubjectsCollide will determine if two subjects could both match a single literal subject.
|
|
func SubjectsCollide(subj1, subj2 string) bool {
|
|
toks1 := strings.Split(subj1, tsep)
|
|
toks2 := strings.Split(subj2, tsep)
|
|
pwc1, fwc1 := analyzeTokens(toks1)
|
|
pwc2, fwc2 := analyzeTokens(toks2)
|
|
// if both literal just string compare.
|
|
l1, l2 := !(pwc1 || fwc1), !(pwc2 || fwc2)
|
|
if l1 && l2 {
|
|
return subj1 == subj2
|
|
}
|
|
// So one or both have wildcards. If one is literal than we can do subset matching.
|
|
if l1 && !l2 {
|
|
return isSubsetMatch(toks1, subj2)
|
|
} else if l2 && !l1 {
|
|
return isSubsetMatch(toks2, subj1)
|
|
}
|
|
// Both have wildcards.
|
|
stop := len(toks1)
|
|
if len(toks2) < stop {
|
|
stop = len(toks2)
|
|
}
|
|
// We look for reasons to say no.
|
|
for i := 0; i < stop; i++ {
|
|
t1, t2 := toks1[i], toks2[i]
|
|
if !tokensCanMatch(t1, t2) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Returns number of tokens in the subject.
|
|
func numTokens(subject string) int {
|
|
var numTokens int
|
|
if len(subject) == 0 {
|
|
return 0
|
|
}
|
|
for i := 0; i < len(subject); i++ {
|
|
if subject[i] == btsep {
|
|
numTokens++
|
|
}
|
|
}
|
|
return numTokens + 1
|
|
}
|
|
|
|
// Fast way to return an indexed token.
|
|
// This is one based, so first token is TokenAt(subject, 1)
|
|
func tokenAt(subject string, index uint8) string {
|
|
ti, start := uint8(1), 0
|
|
for i := 0; i < len(subject); i++ {
|
|
if subject[i] == btsep {
|
|
if ti == index {
|
|
return subject[start:i]
|
|
}
|
|
start = i + 1
|
|
ti++
|
|
}
|
|
}
|
|
if ti == index {
|
|
return subject[start:]
|
|
}
|
|
return _EMPTY_
|
|
}
|
|
|
|
// Calls into the function isSubsetMatch()
|
|
func subjectIsSubsetMatch(subject, test string) bool {
|
|
tsa := [32]string{}
|
|
tts := tsa[:0]
|
|
start := 0
|
|
for i := 0; i < len(subject); i++ {
|
|
if subject[i] == btsep {
|
|
tts = append(tts, subject[start:i])
|
|
start = i + 1
|
|
}
|
|
}
|
|
tts = append(tts, subject[start:])
|
|
return isSubsetMatch(tts, test)
|
|
}
|
|
|
|
// This will test a subject as an array of tokens against a test subject
|
|
// and determine if the tokens are matched. Both test subject and tokens
|
|
// may contain wildcards. So foo.* is a subset match of [">", "*.*", "foo.*"],
|
|
// but not of foo.bar, etc.
|
|
func isSubsetMatch(tokens []string, test string) bool {
|
|
tsa := [32]string{}
|
|
tts := tsa[:0]
|
|
start := 0
|
|
for i := 0; i < len(test); i++ {
|
|
if test[i] == btsep {
|
|
tts = append(tts, test[start:i])
|
|
start = i + 1
|
|
}
|
|
}
|
|
tts = append(tts, test[start:])
|
|
|
|
// Walk the target tokens
|
|
for i, t2 := range tts {
|
|
if i >= len(tokens) {
|
|
return false
|
|
}
|
|
l := len(t2)
|
|
if l == 0 {
|
|
return false
|
|
}
|
|
if t2[0] == fwc && l == 1 {
|
|
return true
|
|
}
|
|
t1 := tokens[i]
|
|
|
|
l = len(t1)
|
|
if l == 0 || t1[0] == fwc && l == 1 {
|
|
return false
|
|
}
|
|
|
|
if t1[0] == pwc && len(t1) == 1 {
|
|
m := t2[0] == pwc && len(t2) == 1
|
|
if !m {
|
|
return false
|
|
}
|
|
if i >= len(tts) {
|
|
return true
|
|
}
|
|
continue
|
|
}
|
|
if t2[0] != pwc && strings.Compare(t1, t2) != 0 {
|
|
return false
|
|
}
|
|
}
|
|
return len(tokens) == len(tts)
|
|
}
|
|
|
|
// matchLiteral is used to test literal subjects, those that do not have any
|
|
// wildcards, with a target subject. This is used in the cache layer.
|
|
func matchLiteral(literal, subject string) bool {
|
|
li := 0
|
|
ll := len(literal)
|
|
ls := len(subject)
|
|
for i := 0; i < ls; i++ {
|
|
if li >= ll {
|
|
return false
|
|
}
|
|
// This function has been optimized for speed.
|
|
// For instance, do not set b:=subject[i] here since
|
|
// we may bump `i` in this loop to avoid `continue` or
|
|
// skipping common test in a particular test.
|
|
// Run Benchmark_SublistMatchLiteral before making any change.
|
|
switch subject[i] {
|
|
case pwc:
|
|
// NOTE: This is not testing validity of a subject, instead ensures
|
|
// that wildcards are treated as such if they follow some basic rules,
|
|
// namely that they are a token on their own.
|
|
if i == 0 || subject[i-1] == btsep {
|
|
if i == ls-1 {
|
|
// There is no more token in the subject after this wildcard.
|
|
// Skip token in literal and expect to not find a separator.
|
|
for {
|
|
// End of literal, this is a match.
|
|
if li >= ll {
|
|
return true
|
|
}
|
|
// Presence of separator, this can't be a match.
|
|
if literal[li] == btsep {
|
|
return false
|
|
}
|
|
li++
|
|
}
|
|
} else if subject[i+1] == btsep {
|
|
// There is another token in the subject after this wildcard.
|
|
// Skip token in literal and expect to get a separator.
|
|
for {
|
|
// We found the end of the literal before finding a separator,
|
|
// this can't be a match.
|
|
if li >= ll {
|
|
return false
|
|
}
|
|
if literal[li] == btsep {
|
|
break
|
|
}
|
|
li++
|
|
}
|
|
// Bump `i` since we know there is a `.` following, we are
|
|
// safe. The common test below is going to check `.` with `.`
|
|
// which is good. A `continue` here is too costly.
|
|
i++
|
|
}
|
|
}
|
|
case fwc:
|
|
// For `>` to be a wildcard, it means being the only or last character
|
|
// in the string preceded by a `.`
|
|
if (i == 0 || subject[i-1] == btsep) && i == ls-1 {
|
|
return true
|
|
}
|
|
}
|
|
if subject[i] != literal[li] {
|
|
return false
|
|
}
|
|
li++
|
|
}
|
|
// Make sure we have processed all of the literal's chars..
|
|
return li >= ll
|
|
}
|
|
|
|
func addLocalSub(sub *subscription, subs *[]*subscription) {
|
|
if sub != nil && sub.client != nil &&
|
|
(sub.client.kind == CLIENT || sub.client.kind == SYSTEM || sub.client.kind == JETSTREAM || sub.client.kind == ACCOUNT) && sub.im == nil {
|
|
*subs = append(*subs, sub)
|
|
}
|
|
}
|
|
|
|
func (s *Sublist) addNodeToSubs(n *node, subs *[]*subscription) {
|
|
// Normal subscriptions
|
|
if n.plist != nil {
|
|
for _, sub := range n.plist {
|
|
addLocalSub(sub, subs)
|
|
}
|
|
} else {
|
|
for _, sub := range n.psubs {
|
|
addLocalSub(sub, subs)
|
|
}
|
|
}
|
|
// Queue subscriptions
|
|
for _, qr := range n.qsubs {
|
|
for _, sub := range qr {
|
|
addLocalSub(sub, subs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Sublist) collectLocalSubs(l *level, subs *[]*subscription) {
|
|
for _, n := range l.nodes {
|
|
s.addNodeToSubs(n, subs)
|
|
s.collectLocalSubs(n.next, subs)
|
|
}
|
|
if l.pwc != nil {
|
|
s.addNodeToSubs(l.pwc, subs)
|
|
s.collectLocalSubs(l.pwc.next, subs)
|
|
}
|
|
if l.fwc != nil {
|
|
s.addNodeToSubs(l.fwc, subs)
|
|
s.collectLocalSubs(l.fwc.next, subs)
|
|
}
|
|
}
|
|
|
|
// Return all local client subscriptions. Use the supplied slice.
|
|
func (s *Sublist) localSubs(subs *[]*subscription) {
|
|
s.RLock()
|
|
s.collectLocalSubs(s.root, subs)
|
|
s.RUnlock()
|
|
}
|
|
|
|
// All is used to collect all subscriptions.
|
|
func (s *Sublist) All(subs *[]*subscription) {
|
|
s.RLock()
|
|
s.collectAllSubs(s.root, subs)
|
|
s.RUnlock()
|
|
}
|
|
|
|
func (s *Sublist) addAllNodeToSubs(n *node, subs *[]*subscription) {
|
|
// Normal subscriptions
|
|
if n.plist != nil {
|
|
*subs = append(*subs, n.plist...)
|
|
} else {
|
|
for _, sub := range n.psubs {
|
|
*subs = append(*subs, sub)
|
|
}
|
|
}
|
|
// Queue subscriptions
|
|
for _, qr := range n.qsubs {
|
|
for _, sub := range qr {
|
|
*subs = append(*subs, sub)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Sublist) collectAllSubs(l *level, subs *[]*subscription) {
|
|
for _, n := range l.nodes {
|
|
s.addAllNodeToSubs(n, subs)
|
|
s.collectAllSubs(n.next, subs)
|
|
}
|
|
if l.pwc != nil {
|
|
s.addAllNodeToSubs(l.pwc, subs)
|
|
s.collectAllSubs(l.pwc.next, subs)
|
|
}
|
|
if l.fwc != nil {
|
|
s.addAllNodeToSubs(l.fwc, subs)
|
|
s.collectAllSubs(l.fwc.next, subs)
|
|
}
|
|
}
|