Removed sublist, hash and hashmap, no longer needed.

This commit is contained in:
Derek Collison
2016-04-02 12:13:28 -07:00
parent fc3e345eb2
commit df02bc0bcf
21 changed files with 1071 additions and 2389 deletions

View File

@@ -1,5 +1,6 @@
language: go
go:
- 1.6
- 1.5
install:
- go get github.com/nats-io/nats

View File

@@ -1,6 +1,8 @@
# General
- [ ] Multiple listen endpoints
- [ ] Listen configure key vs addr and port
- [ ] Multiple Auth
- [ ] Authorization / Access
- [ ] T series reservations
@@ -8,7 +10,6 @@
- [ ] No downtime restart
- [ ] Signal based reload of configuration
- [ ] Dynamic socket buffer sizes
- [ ] Switch to 1.4/1.5 and use maps vs hashmaps in sublist
- [ ] brew, apt-get, rpm, chocately (windows)
- [ ] Sublist better at high concurrency, cache uses writelock always currently
- [ ] Buffer pools/sync pools?
@@ -19,8 +20,9 @@
- [ ] Memory limits/warnings?
- [ ] Limit number of subscriptions a client can have, total memory usage etc.
- [ ] Info updates contain other implicit route servers
- [ ] Multi-tenant accounts with isolation of subject space
- [ ] Pedantic state
- [ ] Multi-tenant accounts with isolation of subject space
- [X] Switch to 1.4/1.5 and use maps vs hashmaps in sublist
- [X] NewSource on Rand to lower lock contention on QueueSubs, or redesign!
- [X] Default sort by cid on connz
- [X] Track last activity time per connection?

View File

@@ -1,4 +1,4 @@
// Copyright 2013 Apcera Inc. All rights reserved.
// Copyright 2013-2016 Apcera Inc. All rights reserved.
// Package conf supports a configuration file format used by gnatsd. It is
// a flexible format that combines the best of traditional
@@ -46,7 +46,6 @@ func Parse(data string) (map[string]interface{}, error) {
}
func parse(data string) (p *parser, err error) {
p = &parser{
mapping: make(map[string]interface{}),
lx: lex(data),

View File

@@ -1,204 +0,0 @@
// Copyright 2012 Apcera Inc. All rights reserved.
// Package hash is a collection of high performance 32-bit hash functions.
package hash
import (
"unsafe"
)
// Bernstein Hash.
func Bernstein(data []byte) uint32 {
hash := uint32(5381)
for _, b := range data {
hash = ((hash << 5) + hash) + uint32(b)
}
return hash
}
// Constants for FNV1A and derivatives
const (
_OFF32 = 2166136261
_P32 = 16777619
_YP32 = 709607
)
// FNV1A Hash [http://en.wikipedia.org/wiki/Fowler-Noll-Vo_hash_function]
func FNV1A(data []byte) uint32 {
var hash uint32 = _OFF32
for _, c := range data {
hash ^= uint32(c)
hash *= _P32
}
return hash
}
// Constants for multiples of sizeof(WORD)
const (
_WSZ = 4 // 4
_DWSZ = _WSZ << 1 // 8
_DDWSZ = _WSZ << 2 // 16
_DDDWSZ = _WSZ << 3 // 32
)
// Jesteress derivative of FNV1A from [http://www.sanmayce.com/Fastest_Hash/]
func Jesteress(data []byte) uint32 {
h32 := uint32(_OFF32)
i, dlen := 0, len(data)
for ; dlen >= _DDWSZ; dlen -= _DDWSZ {
k1 := *(*uint64)(unsafe.Pointer(&data[i]))
k2 := *(*uint64)(unsafe.Pointer(&data[i+4]))
h32 = uint32((uint64(h32) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32)
i += _DDWSZ
}
// Cases: 0,1,2,3,4,5,6,7
if (dlen & _DWSZ) > 0 {
k1 := *(*uint64)(unsafe.Pointer(&data[i]))
h32 = uint32(uint64(h32)^k1) * _YP32
i += _DWSZ
}
if (dlen & _WSZ) > 0 {
k1 := *(*uint32)(unsafe.Pointer(&data[i]))
h32 = (h32 ^ k1) * _YP32
i += _WSZ
}
if (dlen & 1) > 0 {
h32 = (h32 ^ uint32(data[i])) * _YP32
}
return h32 ^ (h32 >> 16)
}
// Meiyan derivative of FNV1A from [http://www.sanmayce.com/Fastest_Hash/]
func Meiyan(data []byte) uint32 {
h32 := uint32(_OFF32)
i, dlen := 0, len(data)
for ; dlen >= _DDWSZ; dlen -= _DDWSZ {
k1 := *(*uint64)(unsafe.Pointer(&data[i]))
k2 := *(*uint64)(unsafe.Pointer(&data[i+4]))
h32 = uint32((uint64(h32) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32)
i += _DDWSZ
}
// Cases: 0,1,2,3,4,5,6,7
if (dlen & _DWSZ) > 0 {
k1 := *(*uint64)(unsafe.Pointer(&data[i]))
h32 = uint32(uint64(h32)^k1) * _YP32
i += _WSZ
k1 = *(*uint64)(unsafe.Pointer(&data[i]))
h32 = uint32(uint64(h32)^k1) * _YP32
i += _WSZ
}
if (dlen & _WSZ) > 0 {
k1 := *(*uint32)(unsafe.Pointer(&data[i]))
h32 = (h32 ^ k1) * _YP32
i += _WSZ
}
if (dlen & 1) > 0 {
h32 = (h32 ^ uint32(data[i])) * _YP32
}
return h32 ^ (h32 >> 16)
}
// Yorikke derivative of FNV1A from [http://www.sanmayce.com/Fastest_Hash/]
func Yorikke(data []byte) uint32 {
h32 := uint32(_OFF32)
h32b := uint32(_OFF32)
i, dlen := 0, len(data)
for ; dlen >= _DDDWSZ; dlen -= _DDDWSZ {
k1 := *(*uint64)(unsafe.Pointer(&data[i]))
k2 := *(*uint64)(unsafe.Pointer(&data[i+4]))
h32 = uint32((uint64(h32) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32)
k1 = *(*uint64)(unsafe.Pointer(&data[i+8]))
k2 = *(*uint64)(unsafe.Pointer(&data[i+12]))
h32b = uint32((uint64(h32b) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32)
i += _DDDWSZ
}
if (dlen & _DDWSZ) > 0 {
k1 := *(*uint64)(unsafe.Pointer(&data[i]))
k2 := *(*uint64)(unsafe.Pointer(&data[i+4]))
h32 = uint32((uint64(h32) ^ k1) * _YP32)
h32b = uint32((uint64(h32b) ^ k2) * _YP32)
i += _DDWSZ
}
// Cases: 0,1,2,3,4,5,6,7
if (dlen & _DWSZ) > 0 {
k1 := *(*uint32)(unsafe.Pointer(&data[i]))
k2 := *(*uint32)(unsafe.Pointer(&data[i+2]))
h32 = (h32 ^ k1) * _YP32
h32b = (h32b ^ k2) * _YP32
i += _DWSZ
}
if (dlen & _WSZ) > 0 {
k1 := *(*uint32)(unsafe.Pointer(&data[i]))
h32 = (h32 ^ k1) * _YP32
i += _WSZ
}
if (dlen & 1) > 0 {
h32 = (h32 ^ uint32(data[i])) * _YP32
}
h32 = (h32 ^ (h32b<<5 | h32b>>27)) * _YP32
return h32 ^ (h32 >> 16)
}
// Constants defined by the Murmur3 algorithm
const (
_C1 = uint32(0xcc9e2d51)
_C2 = uint32(0x1b873593)
_F1 = uint32(0x85ebca6b)
_F2 = uint32(0xc2b2ae35)
)
// M3Seed is a default seed for Murmur3
const M3Seed = uint32(0x9747b28c)
// Murmur3 Hash [http://code.google.com/p/smhasher/wiki/MurmurHash3]
// Does not generate intermediate objects.
func Murmur3(data []byte, seed uint32) uint32 {
h1 := seed
ldata := len(data)
end := ldata - (ldata % 4)
i := 0
// Inner
for ; i < end; i += 4 {
k1 := *(*uint32)(unsafe.Pointer(&data[i]))
k1 *= _C1
k1 = (k1 << 15) | (k1 >> 17)
k1 *= _C2
h1 ^= k1
h1 = (h1 << 13) | (h1 >> 19)
h1 = h1*5 + 0xe6546b64
}
// Tail
var k1 uint32
switch ldata - i {
case 3:
k1 |= uint32(data[i+2]) << 16
fallthrough
case 2:
k1 |= uint32(data[i+1]) << 8
fallthrough
case 1:
k1 |= uint32(data[i])
k1 *= _C1
k1 = (k1 << 15) | (k1 >> 17)
k1 *= _C2
h1 ^= k1
}
// Finalization
h1 ^= uint32(ldata)
h1 ^= (h1 >> 16)
h1 *= _F1
h1 ^= (h1 >> 13)
h1 *= _F2
h1 ^= (h1 >> 16)
return h1
}

View File

@@ -1,201 +0,0 @@
package hash
import (
"math/rand"
"testing"
)
var keys = [][]byte{
[]byte("foo"),
[]byte("bar"),
[]byte("apcera.continuum.router.foo.bar"),
[]byte("apcera.continuum.router.foo.bar.baz"),
}
func TestBernstein(t *testing.T) {
results := []uint32{193491849, 193487034, 2487287557, 3139297488}
for i, key := range keys {
h := Bernstein(key)
if h != results[i] {
t.Fatalf("hash is incorrect, expected %d, got %d\n",
results[i], h)
}
}
}
func TestFNV1A(t *testing.T) {
results := []uint32{2851307223, 1991736602, 1990810707, 1244015104}
for i, key := range keys {
h := FNV1A(key)
if h != results[i] {
t.Fatalf("hash is incorrect, expected %d, got %d\n",
results[i], h)
}
}
}
func TestJesteress(t *testing.T) {
results := []uint32{1058908168, 1061739001, 4242539713, 3332038527}
for i, key := range keys {
h := Jesteress(key)
if h != results[i] {
t.Fatalf("hash is incorrect, expected %d, got %d\n",
results[i], h)
}
}
}
func TestMeiyan(t *testing.T) {
results := []uint32{1058908168, 1061739001, 2891236487, 3332038527}
for i, key := range keys {
h := Meiyan(key)
if h != results[i] {
t.Fatalf("hash is incorrect, expected %d, got %d\n",
results[i], h)
}
}
}
func TestYorikke(t *testing.T) {
results := []uint32{3523423968, 2222334353, 407908456, 359111667}
for i, key := range keys {
h := Yorikke(key)
if h != results[i] {
t.Fatalf("hash is incorrect, expected %d, got %d\n",
results[i], h)
}
}
}
func TestMurmur3(t *testing.T) {
results := []uint32{659908353, 522989004, 135963738, 990328005}
for i, key := range keys {
h := Murmur3(key, M3Seed)
if h != results[i] {
t.Fatalf("hash is incorrect, expected %d, got %d\n",
results[i], h)
}
}
}
var ch = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@$#%^&*()")
func sizedBytes(sz int) []byte {
b := make([]byte, sz)
for i := range b {
b[i] = ch[rand.Intn(len(ch))]
}
return b
}
var smlKey = sizedBytes(8)
var medKey = sizedBytes(32)
var lrgKey = sizedBytes(256)
func Benchmark_Bernstein_SmallKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Bernstein(smlKey)
}
}
func Benchmark_Murmur3___SmallKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Murmur3(smlKey, M3Seed)
}
}
func Benchmark_FNV1A_____SmallKey(b *testing.B) {
for i := 0; i < b.N; i++ {
FNV1A(smlKey)
}
}
func Benchmark_Meiyan____SmallKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Meiyan(smlKey)
}
}
func Benchmark_Jesteress_SmallKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Jesteress(smlKey)
}
}
func Benchmark_Yorikke___SmallKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Yorikke(smlKey)
}
}
func Benchmark_Bernstein___MedKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Bernstein(medKey)
}
}
func Benchmark_Murmur3_____MedKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Murmur3(medKey, M3Seed)
}
}
func Benchmark_FNV1A_______MedKey(b *testing.B) {
for i := 0; i < b.N; i++ {
FNV1A(medKey)
}
}
func Benchmark_Meiyan______MedKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Meiyan(medKey)
}
}
func Benchmark_Jesteress___MedKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Jesteress(medKey)
}
}
func Benchmark_Yorikke_____MedKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Yorikke(medKey)
}
}
func Benchmark_Bernstein___LrgKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Bernstein(lrgKey)
}
}
func Benchmark_Murmur3_____LrgKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Murmur3(lrgKey, M3Seed)
}
}
func Benchmark_FNV1A_______LrgKey(b *testing.B) {
for i := 0; i < b.N; i++ {
FNV1A(lrgKey)
}
}
func Benchmark_Meiyan______LrgKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Meiyan(lrgKey)
}
}
func Benchmark_Jesteress___LrgKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Jesteress(lrgKey)
}
}
func Benchmark_Yorikke_____LrgKey(b *testing.B) {
for i := 0; i < b.N; i++ {
Yorikke(lrgKey)
}
}

View File

@@ -1,115 +0,0 @@
NOTE: I used SetBytes(1) to give quick estimate of ops/sec
2013 MacbookAir 11" i7 1.7Ghz Haswell
================
OSX - Mavericks 10.9.2
Go version go1.2.1
================
Benchmark_Bernstein_SmallKey 500000000 5.13 ns/op 195.10 MB/s
Benchmark_Murmur3___SmallKey 200000000 8.11 ns/op 123.26 MB/s
Benchmark_FNV1A_____SmallKey 500000000 5.07 ns/op 197.36 MB/s
Benchmark_Meiyan____SmallKey 500000000 4.24 ns/op 236.02 MB/s
Benchmark_Jesteress_SmallKey 500000000 5.32 ns/op 188.08 MB/s
Benchmark_Yorikke___SmallKey 500000000 5.52 ns/op 181.20 MB/s
Benchmark_Bernstein___MedKey 50000000 34.9 ns/op 28.65 MB/s
Benchmark_Murmur3_____MedKey 100000000 17.9 ns/op 55.94 MB/s
Benchmark_FNV1A_______MedKey 50000000 31.9 ns/op 31.37 MB/s
Benchmark_Meiyan______MedKey 200000000 9.28 ns/op 107.76 MB/s
Benchmark_Jesteress___MedKey 200000000 8.15 ns/op 122.65 MB/s
Benchmark_Yorikke_____MedKey 200000000 9.19 ns/op 108.83 MB/s
2012 MacbookAir 11" i7 2Ghz
================
OSX - Mountain Lion
Go version go1.1
================
Benchmark_Bernstein_SmallKey 500000000 5.19 ns/op 192.58 MB/s
Benchmark_Murmur3___SmallKey 200000000 7.98 ns/op 125.31 MB/s
Benchmark_FNV1A_____SmallKey 500000000 4.89 ns/op 204.30 MB/s
Benchmark_Meiyan____SmallKey 500000000 5.13 ns/op 195.10 MB/s
Benchmark_Jesteress_SmallKey 500000000 5.11 ns/op 195.52 MB/s
Benchmark_Yorikke___SmallKey 500000000 5.43 ns/op 184.01 MB/s
Benchmark_Bernstein___MedKey 50000000 39.10 ns/op 25.60 MB/s
Benchmark_Murmur3_____MedKey 100000000 18.70 ns/op 53.45 MB/s
Benchmark_FNV1A_______MedKey 50000000 32.10 ns/op 31.16 MB/s
Benchmark_Meiyan______MedKey 500000000 7.79 ns/op 128.33 MB/s
Benchmark_Jesteress___MedKey 500000000 7.05 ns/op 141.81 MB/s
Benchmark_Yorikke_____MedKey 500000000 7.51 ns/op 133.19 MB/s
================
OSX - Mountain Lion
Go version go1.0.3
================
go test --bench="." -gcflags="-B"
Benchmark_Bernstein_SmallKey 200000000 9.17 ns/op 109.03 MB/s
Benchmark_Murmur3___SmallKey 200000000 9.68 ns/op 103.27 MB/s
Benchmark_FNV1A_____SmallKey 200000000 9.21 ns/op 108.58 MB/s
Benchmark_Meiyan____SmallKey 500000000 6.34 ns/op 157.82 MB/s
Benchmark_Jesteress_SmallKey 500000000 6.37 ns/op 157.01 MB/s
Benchmark_Yorikke___SmallKey 500000000 6.98 ns/op 143.34 MB/s
Benchmark_Bernstein___MedKey 50000000 53.4 ns/op 18.71 MB/s
Benchmark_Murmur3_____MedKey 100000000 29.8 ns/op 33.56 MB/s
Benchmark_FNV1A_______MedKey 50000000 52.6 ns/op 19.02 MB/s
Benchmark_Meiyan______MedKey 200000000 8.84 ns/op 113.14 MB/s
Benchmark_Jesteress___MedKey 200000000 7.94 ns/op 125.90 MB/s
Benchmark_Yorikke_____MedKey 200000000 9.48 ns/op 105.50 MB/s
================
Ubunutu 12.10
Go version go1.0.3
gcc version 4.7.2 (Ubuntu/Linaro 4.7.2-4precise1)
================
go test --bench="." -gcflags="-B"
Benchmark_Bernstein_SmallKey 200000000 9.90 ns/op 101.06 MB/s
Benchmark_Murmur3___SmallKey 100000000 10.1 ns/op 98.96 MB/s
Benchmark_FNV1A_____SmallKey 200000000 9.29 ns/op 107.59 MB/s
Benchmark_Meiyan____SmallKey 500000000 6.15 ns/op 162.50 MB/s
Benchmark_Jesteress_SmallKey 500000000 6.78 ns/op 147.58 MB/s
Benchmark_Yorikke___SmallKey 500000000 7.17 ns/op 139.49 MB/s
Benchmark_Bernstein___MedKey 50000000 55.0 ns/op 18.18 MB/s
Benchmark_Murmur3_____MedKey 50000000 30.2 ns/op 33.13 MB/s
Benchmark_FNV1A_______MedKey 50000000 56.0 ns/op 17.86 MB/s
Benchmark_Meiyan______MedKey 200000000 9.14 ns/op 109.43 MB/s
Benchmark_Jesteress___MedKey 200000000 8.25 ns/op 121.24 MB/s
Benchmark_Yorikke_____MedKey 200000000 9.72 ns/op 102.91 MB/s
go test --bench="." -compiler gccgo -gccgoflags="-O2" -gcflags="-B"
Benchmark_Bernstein_SmallKey 500000000 4.70 ns/op 212.97 MB/s
Benchmark_Murmur3___SmallKey 200000000 8.18 ns/op 122.21 MB/s
Benchmark_FNV1A_____SmallKey 500000000 5.18 ns/op 193.17 MB/s
Benchmark_Meiyan____SmallKey 500000000 6.21 ns/op 161.13 MB/s
Benchmark_Jesteress_SmallKey 500000000 5.51 ns/op 181.38 MB/s
Benchmark_Yorikke___SmallKey 500000000 7.13 ns/op 140.19 MB/s
Benchmark_Bernstein___MedKey 100000000 27.8 ns/op 35.98 MB/s
Benchmark_Murmur3_____MedKey 100000000 19.1 ns/op 52.46 MB/s
Benchmark_FNV1A_______MedKey 50000000 34.7 ns/op 28.79 MB/s
Benchmark_Meiyan______MedKey 500000000 7.24 ns/op 138.10 MB/s
Benchmark_Jesteress___MedKey 500000000 6.58 ns/op 151.97 MB/s
Benchmark_Yorikke_____MedKey 200000000 9.08 ns/op 110.17 MB/s
go test --bench="." -compiler gccgo -gccgoflags="-O3" -gcflags="-B"
Benchmark_Bernstein_SmallKey 2000000000 1.80 ns/op 555.22 MB/s
Benchmark_Murmur3___SmallKey 200000000 8.07 ns/op 123.86 MB/s
Benchmark_FNV1A_____SmallKey 2000000000 1.89 ns/op 528.93 MB/s
Benchmark_Meiyan____SmallKey 500000000 6.20 ns/op 161.36 MB/s
Benchmark_Jesteress_SmallKey 500000000 5.47 ns/op 182.76 MB/s
Benchmark_Yorikke___SmallKey 500000000 7.11 ns/op 140.58 MB/s
Benchmark_Bernstein___MedKey 100000000 20.3 ns/op 49.18 MB/s
Benchmark_Murmur3_____MedKey 100000000 19.0 ns/op 52.58 MB/s
Benchmark_FNV1A_______MedKey 100000000 20.3 ns/op 49.35 MB/s
Benchmark_Meiyan______MedKey 500000000 6.99 ns/op 143.00 MB/s
Benchmark_Jesteress___MedKey 500000000 6.44 ns/op 155.23 MB/s
Benchmark_Yorikke_____MedKey 200000000 9.01 ns/op 110.98 MB/s

View File

@@ -1,257 +0,0 @@
// Copyright 2012-2015 Apcera Inc. All rights reserved.
// Package hashmap defines a high performance hashmap based on
// fast hashing and fast key comparison. Simple chaining
// is used, relying on the hashing algorithms for good
// distribution
package hashmap
import (
"bytes"
"errors"
"unsafe"
"github.com/nats-io/gnatsd/hash"
)
// HashMap stores Entry items using a given Hash function.
// The Hash function can be overridden.
type HashMap struct {
Hash func([]byte) uint32
bkts []*Entry
msk uint32
used uint32
rsz bool
}
// Entry represents what the map is actually storing.
// Uses simple linked list resolution for collisions.
type Entry struct {
hk uint32
key []byte
data interface{}
next *Entry
}
// BucketSize, must be power of 2
const _BSZ = 8
// Constants for multiples of sizeof(WORD)
const (
_WSZ = 4 // 4
_DWSZ = _WSZ << 1 // 8
)
// DefaultHash to be used unless overridden.
var DefaultHash = hash.Jesteress
// Stats are reported on HashMaps
type Stats struct {
NumElements uint32
NumSlots uint32
NumBuckets uint32
LongChain uint32
AvgChain float32
}
// NewWithBkts creates a new HashMap using the bkts slice argument.
// len(bkts) must be a power of 2.
func NewWithBkts(bkts []*Entry) (*HashMap, error) {
l := len(bkts)
if l == 0 || (l&(l-1) != 0) {
return nil, errors.New("Size of buckets must be power of 2")
}
h := HashMap{}
h.msk = uint32(l - 1)
h.bkts = bkts
h.Hash = DefaultHash
h.rsz = true
return &h, nil
}
// New creates a new HashMap of default size and using the default
// Hashing algorithm.
func New() *HashMap {
h, _ := NewWithBkts(make([]*Entry, _BSZ))
return h
}
// Set will set the key item to data. This will blindly replace any item
// that may have been at key previous.
func (h *HashMap) Set(key []byte, data interface{}) {
hk := h.Hash(key)
e := h.bkts[hk&h.msk]
for e != nil {
if len(key) == len(e.key) && bytes.Equal(key, e.key) {
// Success, replace data field
e.data = data
return
}
e = e.next
}
// We have a new entry here
ne := &Entry{hk: hk, key: key, data: data}
ne.next = h.bkts[hk&h.msk]
h.bkts[hk&h.msk] = ne
h.used++
// Check for resizing
if h.rsz && (h.used > uint32(len(h.bkts))) {
h.grow()
}
}
// Get will return the item at key.
func (h *HashMap) Get(key []byte) interface{} {
hk := h.Hash(key)
e := h.bkts[hk&h.msk]
// FIXME: Reorder on GET if chained?
// We unroll and optimize the comparison of keys.
for e != nil {
i, klen := 0, len(key)
if klen != len(e.key) {
goto next
}
// We unroll and optimize the key comparison here.
// Compare _DWSZ at a time
for ; klen >= _DWSZ; klen -= _DWSZ {
k1 := *(*uint64)(unsafe.Pointer(&key[i]))
k2 := *(*uint64)(unsafe.Pointer(&e.key[i]))
if k1 != k2 {
goto next
}
i += _DWSZ
}
// Check by _WSZ if applicable
if (klen & _WSZ) > 0 {
k1 := *(*uint32)(unsafe.Pointer(&key[i]))
k2 := *(*uint32)(unsafe.Pointer(&e.key[i]))
if k1 != k2 {
goto next
}
i += _WSZ
}
// Compare what is left over, byte by byte
for ; i < len(key); i++ {
if key[i] != e.key[i] {
goto next
}
}
// Success
return e.data
next:
e = e.next
}
return nil
}
// Remove will remove what is associated with key.
func (h *HashMap) Remove(key []byte) {
hk := h.Hash(key)
e := &h.bkts[hk&h.msk]
for *e != nil {
if len(key) == len((*e).key) && bytes.Equal(key, (*e).key) {
// Success
*e = (*e).next
h.used--
// Check for resizing
lbkts := uint32(len(h.bkts))
if h.rsz && lbkts > _BSZ && (h.used < lbkts/4) {
h.shrink()
}
return
}
e = &(*e).next
}
}
// resize is responsible for reallocating the buckets and
// redistributing the hashmap entries.
func (h *HashMap) resize(nsz uint32) {
nmsk := nsz - 1
bkts := make([]*Entry, nsz)
ents := make([]Entry, h.used)
var ne *Entry
var i int
for _, e := range h.bkts {
for ; e != nil; e = e.next {
ne, i = &ents[i], i+1
*ne = *e
ne.next = bkts[e.hk&nmsk]
bkts[e.hk&nmsk] = ne
}
}
h.bkts = bkts
h.msk = nmsk
}
const maxBktSize = (1 << 31) - 1
// grow the HashMap's buckets by 2
func (h *HashMap) grow() {
// Can't grow beyond maxint for now
if len(h.bkts) >= maxBktSize {
return
}
h.resize(uint32(2 * len(h.bkts)))
}
// shrink the HashMap's buckets by 2
func (h *HashMap) shrink() {
if len(h.bkts) <= _BSZ {
return
}
h.resize(uint32(len(h.bkts) / 2))
}
// Count returns number of elements in the HashMap
func (h *HashMap) Count() uint32 {
return h.used
}
// AllKeys will return all the keys stored in the HashMap
func (h *HashMap) AllKeys() [][]byte {
all := make([][]byte, 0, h.used)
for _, e := range h.bkts {
for ; e != nil; e = e.next {
all = append(all, e.key)
}
}
return all
}
// All returns all the Entries in the map
func (h *HashMap) All() []interface{} {
all := make([]interface{}, 0, h.used)
for _, e := range h.bkts {
for ; e != nil; e = e.next {
all = append(all, e.data)
}
}
return all
}
// Stats will collect general statistics about the HashMap
func (h *HashMap) Stats() *Stats {
lc, totalc, slots := 0, 0, 0
for _, e := range h.bkts {
if e != nil {
slots++
}
i := 0
for ; e != nil; e = e.next {
i++
if i > lc {
lc = i
}
}
totalc += i
}
l := uint32(len(h.bkts))
avg := (float32(totalc) / float32(slots))
return &Stats{
NumElements: h.used,
NumBuckets: l,
LongChain: uint32(lc),
AvgChain: avg,
NumSlots: uint32(slots)}
}

View File

@@ -1,348 +0,0 @@
package hashmap
import (
"bytes"
"crypto/rand"
"encoding/hex"
"io"
"testing"
)
func TestMapWithBkts(t *testing.T) {
bkts := make([]*Entry, 3, 3)
_, err := NewWithBkts(bkts)
if err == nil {
t.Fatalf("Buckets size of %d should have failed\n", len(bkts))
}
bkts = make([]*Entry, 8, 8)
_, err = NewWithBkts(bkts)
if err != nil {
t.Fatalf("Buckets size of %d should have succeeded\n", len(bkts))
}
}
var foo = []byte("foo")
var bar = []byte("bar")
var baz = []byte("baz")
var med = []byte("foo.bar.baz")
var sub = []byte("apcera.continuum.router.foo.bar.baz")
func TestHashMapBasics(t *testing.T) {
h := New()
if h.used != 0 {
t.Fatalf("Wrong number of entries: %d vs 0\n", h.used)
}
h.Set(foo, bar)
if h.used != 1 {
t.Fatalf("Wrong number of entries: %d vs 1\n", h.used)
}
if v := h.Get(foo).([]byte); !bytes.Equal(v, bar) {
t.Fatalf("Did not receive correct answer: '%s' vs '%s'\n", bar, v)
}
h.Remove(foo)
if h.used != 0 {
t.Fatalf("Wrong number of entries: %d vs 0\n", h.used)
}
if v := h.Get(foo); v != nil {
t.Fatal("Did not receive correct answer, should be nil")
}
}
const (
INS = 100
EXP = 128
REM = 75
EXP2 = 64
)
func TestGrowing(t *testing.T) {
h := New()
if len(h.bkts) != _BSZ {
t.Fatalf("Initial bucket size is wrong: %d vs %d\n", len(h.bkts), _BSZ)
}
// Create _INBOX style end tokens
var toks [INS][]byte
for i := range toks {
u := make([]byte, 13)
io.ReadFull(rand.Reader, u)
toks[i] = []byte(hex.EncodeToString(u))
h.Set(toks[i], toks[i])
tg := h.Get(toks[i]).([]byte)
if !bytes.Equal(tg, toks[i]) {
t.Fatalf("Did not match properly, '%s' vs '%s'\n", tg, toks[i])
}
}
if len(h.bkts) != EXP {
t.Fatalf("Expanded bucket size is wrong: %d vs %d\n", len(h.bkts), EXP)
}
}
func TestHashMapCollisions(t *testing.T) {
h := New()
h.rsz = false
// Create _INBOX style end tokens
var toks [INS][]byte
for i := range toks {
u := make([]byte, 13)
io.ReadFull(rand.Reader, u)
toks[i] = []byte(hex.EncodeToString(u))
h.Set(toks[i], toks[i])
tg := h.Get(toks[i]).([]byte)
if !bytes.Equal(tg, toks[i]) {
t.Fatalf("Did not match properly, '%s' vs '%s'\n", tg, toks[i])
}
}
if len(h.bkts) != _BSZ {
t.Fatalf("Bucket size is wrong: %d vs %d\n", len(h.bkts), _BSZ)
}
h.grow()
if len(h.bkts) != 2*_BSZ {
t.Fatalf("Bucket size is wrong: %d vs %d\n", len(h.bkts), 2*_BSZ)
}
ti := 32
tg := h.Get(toks[ti]).([]byte)
if !bytes.Equal(tg, toks[ti]) {
t.Fatalf("Did not match properly, '%s' vs '%s'\n", tg, toks[ti])
}
h.Remove(toks[99])
rg := h.Get(toks[99])
if rg != nil {
t.Fatalf("After remove should have been nil! '%s'\n", rg.([]byte))
}
}
func TestAll(t *testing.T) {
h := New()
h.Set([]byte("1"), 1)
h.Set([]byte("2"), 1)
h.Set([]byte("3"), 1)
all := h.All()
if len(all) != 3 {
t.Fatalf("Expected All() to return 3, but got %d\n", len(all))
}
allkeys := h.AllKeys()
if len(allkeys) != 3 {
t.Fatalf("Expected All() to return 3, but got %d\n", len(allkeys))
}
}
func TestSetDoesReplaceOnExisting(t *testing.T) {
h := New()
k := []byte("key")
h.Set(k, "foo")
h.Set(k, "bar")
all := h.All()
if len(all) != 1 {
t.Fatalf("Set should replace, expected 1 vs %d\n", len(all))
}
s, ok := all[0].(string)
if !ok {
t.Fatalf("Value is incorrect: %v\n", all[0].(string))
}
if s != "bar" {
t.Fatalf("Value is incorrect, expected 'bar' vs '%s'\n", s)
}
}
func TestCollision(t *testing.T) {
h := New()
k1 := []byte("999")
k2 := []byte("1000")
h.Set(k1, "foo")
h.Set(k2, "bar")
all := h.All()
if len(all) != 2 {
t.Fatalf("Expected 2 vs %d\n", len(all))
}
if h.Get(k1) == nil {
t.Fatalf("Failed to get '999'\n")
}
}
func TestHashMapStats(t *testing.T) {
h := New()
h.rsz = false
// Create _INBOX style end tokens
var toks [INS][]byte
for i := range toks {
u := make([]byte, 13)
io.ReadFull(rand.Reader, u)
toks[i] = []byte(hex.EncodeToString(u))
h.Set(toks[i], toks[i])
tg := h.Get(toks[i]).([]byte)
if !bytes.Equal(tg, toks[i]) {
t.Fatalf("Did not match properly, '%s' vs '%s'\n", tg, toks[i])
}
}
s := h.Stats()
if s.NumElements != INS {
t.Fatalf("NumElements incorrect: %d vs %d\n", s.NumElements, INS)
}
if s.NumBuckets != _BSZ {
t.Fatalf("NumBuckets incorrect: %d vs %d\n", s.NumBuckets, _BSZ)
}
if s.AvgChain > 13 || s.AvgChain < 12 {
t.Fatalf("AvgChain out of bounds: %f vs %f\n", s.AvgChain, 12.5)
}
if s.LongChain > 25 {
t.Fatalf("LongChain out of bounds: %d vs %d\n", s.LongChain, 22)
}
}
func TestShrink(t *testing.T) {
h := New()
if len(h.bkts) != _BSZ {
t.Fatalf("Initial bucket size is wrong: %d vs %d\n", len(h.bkts), _BSZ)
}
// Create _INBOX style end tokens
var toks [INS][]byte
for i := range toks {
u := make([]byte, 13)
io.ReadFull(rand.Reader, u)
toks[i] = []byte(hex.EncodeToString(u))
h.Set(toks[i], toks[i])
tg := h.Get(toks[i]).([]byte)
if !bytes.Equal(tg, toks[i]) {
t.Fatalf("Did not match properly, '%s' vs '%s'\n", tg, toks[i])
}
}
if len(h.bkts) != EXP {
t.Fatalf("Expanded bucket size is wrong: %d vs %d\n", len(h.bkts), EXP)
}
for i := 0; i < REM; i++ {
h.Remove(toks[i])
}
if len(h.bkts) != EXP2 {
t.Fatalf("Shrunk bucket size is wrong: %d vs %d\n", len(h.bkts), EXP2)
}
}
func TestFalseLookup(t *testing.T) {
h := New()
// DW + W
h.Set([]byte("cache.test.0"), "foo")
v := h.Get([]byte("cache.test.1"))
if v != nil {
t.Fatalf("Had a match when did not expect one!\n")
}
// DW + W + 3
h.Set([]byte("cache.test.1234"), "foo")
v = h.Get([]byte("cache.test.0000"))
if v != nil {
t.Fatalf("Had a match when did not expect one!\n")
}
}
func TestRemoveRandom(t *testing.T) {
h := New()
h.RemoveRandom()
h.Set(foo, "1")
h.Set(bar, "1")
h.Set(baz, "1")
if h.Count() != 3 {
t.Fatalf("Expected 3 members, got %d\n", h.Count())
}
h.RemoveRandom()
if h.Count() != 2 {
t.Fatalf("Expected 2 members, got %d\n", h.Count())
}
}
func Benchmark_GoMap___GetSmallKey(b *testing.B) {
b.StopTimer()
m := make(map[string][]byte)
m["foo"] = bar
b.StartTimer()
for i := 0; i < b.N; i++ {
_ = m["foo"]
}
}
func Benchmark_HashMap_GetSmallKey(b *testing.B) {
b.StopTimer()
m := New()
m.Set(foo, bar)
b.StartTimer()
for i := 0; i < b.N; i++ {
_ = m.Get(foo)
}
}
func Benchmark_GoMap____GetMedKey(b *testing.B) {
b.StopTimer()
ts := string(med)
m := make(map[string][]byte)
m[ts] = bar
b.StartTimer()
for i := 0; i < b.N; i++ {
_ = m[ts]
}
}
func Benchmark_HashMap__GetMedKey(b *testing.B) {
b.StopTimer()
m := New()
m.Set(sub, bar)
b.StartTimer()
for i := 0; i < b.N; i++ {
_ = m.Get(med)
}
}
func Benchmark_GoMap____GetLrgKey(b *testing.B) {
b.StopTimer()
ts := string(sub)
m := make(map[string][]byte)
m[ts] = bar
b.StartTimer()
for i := 0; i < b.N; i++ {
_ = m[ts]
}
}
func Benchmark_HashMap__GetLrgKey(b *testing.B) {
b.StopTimer()
m := New()
m.Set(sub, bar)
b.StartTimer()
for i := 0; i < b.N; i++ {
_ = m.Get(sub)
}
}
func Benchmark_GoMap_________Set(b *testing.B) {
b.StopTimer()
m := make(map[string][]byte)
b.StartTimer()
for i := 0; i < b.N; i++ {
m["foo"] = bar
}
}
func Benchmark_HashMap_______Set(b *testing.B) {
b.StopTimer()
m := New()
b.StartTimer()
for i := 0; i < b.N; i++ {
m.Set(foo, bar)
}
}

View File

@@ -1,43 +0,0 @@
// Copyright 2012-2013 Apcera Inc. All rights reserved.
package hashmap
import (
"math/rand"
"time"
)
// We use init to setup the random number generator
func init() {
rand.Seed(time.Now().UnixNano())
}
// RemoveRandom can be used for a random policy eviction.
// This is stochastic but very fast and does not impede
// performance like LRU, LFU or even ARC based implementations.
func (h *HashMap) RemoveRandom() {
if h.used == 0 {
return
}
index := (rand.Int()) & int(h.msk)
// Walk forward til we find an entry
for i := index; i < len(h.bkts); i++ {
e := &h.bkts[i]
if *e != nil {
*e = (*e).next
h.used--
return
}
}
// If we are here we hit end and did not remove anything,
// use the index and walk backwards.
for i := index; i >= 0; i-- {
e := &h.bkts[i]
if *e != nil {
*e = (*e).next
h.used--
return
}
}
panic("Should not reach here..")
}

View File

@@ -1,52 +0,0 @@
2015 iMac 5k 4Ghz i7
MacOSX 10.11.2
====================
Go version go1.5.2
====================
PASS
Benchmark_GoMap___GetSmallKey-8 300000000 5.06 ns/op 197.63 mops/s
Benchmark_HashMap_GetSmallKey-8 100000000 10.6 ns/op 94.34 mops/s
Benchmark_GoMap____GetMedKey-8 300000000 5.09 ns/op 196.46 mops/s
Benchmark_HashMap__GetMedKey-8 200000000 6.75ns/op 148.15 mops/s
Benchmark_GoMap____GetLrgKey-8 300000000 4.88ns/op 204.91 mops/s
Benchmark_HashMap__GetLrgKey-8 100000000 17.8 ns/op 56.18 mops/s
Benchmark_GoMap_________Set-8 50000000 26.3 ns/op 38.02 mops/s
Benchmark_HashMap_______Set-8 20000000 82.4 ns/op 12.13 mops/s
2013 MacbookAir 11" i7 1.7Ghz Haswell
Linux mint15 3.8.0-19
====================
Go version go1.2.1
====================
Benchmark_GoMap___GetSmallKey 500000000 7.57 ns/op 132.05 mops/s
Benchmark_HashMap_GetSmallKey 100000000 14.30 ns/op 70.08 mops/s
Benchmark_GoMap____GetMedKey 500000000 4.83 ns/op 207.01 mops/s
Benchmark_HashMap__GetMedKey 200000000 9.54 ns/op 104.82 mops/s
Benchmark_GoMap____GetLrgKey 500000000 4.39 ns/op 227.79 mops/s
Benchmark_HashMap__GetLrgKey 100000000 24.50 ns/op 40.77 mops/s
====================
Go version go1.2.1
====================
Benchmark_GoMap___GetSmallKey 200000000 8.77 ns/op 114.02 mops/s
Benchmark_HashMap_GetSmallKey 100000000 14.80 ns/op 67.53 mops/s
Benchmark_GoMap____GetMedKey 500000000 6.21 ns/op 161.05 mops/s
Benchmark_HashMap__GetMedKey 200000000 9.51 ns/op 105.15 mops/s
Benchmark_GoMap____GetLrgKey 100000000 18.30 ns/op 54.68 mops/s
Benchmark_HashMap__GetLrgKey 100000000 24.80 ns/op 40.36 mops/s
====================
Go version go1.0.3
====================
Benchmark_GoMap___GetSmallKey 50000000 52.20 ns/op 19.17 mops/s
Benchmark_HashMap_GetSmallKey 100000000 15.50 ns/op 64.34 mops/s
Benchmark_GoMap____GetMedKey 50000000 61.60 ns/op 16.24 mops/s
Benchmark_HashMap__GetMedKey 200000000 8.91 ns/op 112.20 mops/s
Benchmark_GoMap____GetLrgKey 20000000 86.90 ns/op 11.51 mops/s
Benchmark_HashMap__GetLrgKey 100000000 25.40 ns/op 39.44 mops/s

View File

@@ -5,11 +5,8 @@ rm -rf ./cov
mkdir cov
go test -v -covermode=atomic -coverprofile=./cov/auth.out ./auth
go test -v -covermode=atomic -coverprofile=./cov/conf.out ./conf
go test -v -covermode=atomic -coverprofile=./cov/hash.out ./hash
go test -v -covermode=atomic -coverprofile=./cov/hashmap.out ./hashmap
go test -v -covermode=atomic -coverprofile=./cov/log.out ./logger
go test -v -covermode=atomic -coverprofile=./cov/server.out ./server
go test -v -covermode=atomic -coverprofile=./cov/sublist.out ./sublist
go test -v -covermode=atomic -coverprofile=./cov/test.out ./test
gocovmerge ./cov/*.out > acc.out
rm -rf ./cov

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2015 Apcera Inc. All rights reserved.
// Copyright 2012-2016 Apcera Inc. All rights reserved.
package server
@@ -11,9 +11,6 @@ import (
"sync"
"sync/atomic"
"time"
"github.com/nats-io/gnatsd/hashmap"
"github.com/nats-io/gnatsd/sublist"
)
func init() {
@@ -48,7 +45,7 @@ type client struct {
ncs string
bw *bufio.Writer
srv *Server
subs *hashmap.HashMap
subs map[string]*subscription
pcd map[*client]struct{}
atmr *time.Timer
ptmr *time.Timer
@@ -102,7 +99,7 @@ func (c *client) initClient(tlsConn bool) {
s := c.srv
c.cid = atomic.AddUint64(&s.gcid, 1)
c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize)
c.subs = hashmap.New()
c.subs = make(map[string]*subscription)
c.debug = (atomic.LoadInt32(&debug) != 0)
c.trace = (atomic.LoadInt32(&trace) != 0)
@@ -457,7 +454,7 @@ func (c *client) processPub(arg []byte) error {
return ErrMaxPayload
}
if c.opts.Pedantic && !sublist.IsValidLiteralSubject(c.pa.subject) {
if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) {
c.sendErr("Invalid Subject")
}
return nil
@@ -516,12 +513,13 @@ func (c *client) processSub(argo []byte) (err error) {
// We can have two SUB protocols coming from a route due to some
// race conditions. We should make sure that we process only one.
if c.subs.Get(sub.sid) == nil {
c.subs.Set(sub.sid, sub)
sid := string(sub.sid)
if c.subs[sid] == nil {
c.subs[sid] = sub
if c.srv != nil {
err = c.srv.sl.Insert(sub.subject, sub)
err = c.srv.sl.Insert(sub)
if err != nil {
c.subs.Remove(sub.sid)
delete(c.subs, sid)
} else {
shouldForward = c.typ != ROUTER
}
@@ -549,9 +547,9 @@ func (c *client) unsubscribe(sub *subscription) {
return
}
c.traceOp("<-> %s", "DELSUB", sub.sid)
c.subs.Remove(sub.sid)
delete(c.subs, string(sub.sid))
if c.srv != nil {
c.srv.sl.Remove(sub.subject, sub)
c.srv.sl.Remove(sub)
}
}
@@ -578,7 +576,7 @@ func (c *client) processUnsub(arg []byte) error {
ok := false
c.mu.Lock()
if sub, ok = (c.subs.Get(sid)).(*subscription); ok {
if sub, ok = c.subs[string(sid)]; ok {
if max > 0 {
sub.max = int64(max)
} else {
@@ -747,7 +745,7 @@ func (c *client) processMsg(msg []byte) {
atomic.AddInt64(&srv.inMsgs, 1)
atomic.AddInt64(&srv.inBytes, msgSize)
r := srv.sl.Match(c.pa.subject)
r := srv.sl.Match(string(c.pa.subject))
if len(r) <= 0 {
return
}
@@ -779,9 +777,7 @@ func (c *client) processMsg(msg []byte) {
var qmap map[string][]*subscription
// Loop over all subscriptions that match.
for _, v := range r {
sub := v.(*subscription)
for _, sub := range r {
// Process queue group subscriptions by gathering them all up
// here. We will pick the winners when we are done processing
// all of the subscriptions.
@@ -958,7 +954,10 @@ func (c *client) closeConnection() {
c.nc = nil
// Snapshot for use.
subs := c.subs.All()
subs := make([]*subscription, 0, len(c.subs))
for _, sub := range c.subs {
subs = append(subs, sub)
}
srv := c.srv
retryImplicit := false
@@ -973,14 +972,12 @@ func (c *client) closeConnection() {
srv.removeClient(c)
// Remove clients subscriptions.
for _, s := range subs {
if sub, ok := s.(*subscription); ok {
srv.sl.Remove(sub.subject, sub)
// Forward on unsubscribes if we are not
// a router ourselves.
if c.typ != ROUTER {
srv.broadcastUnSubscribe(sub)
}
for _, sub := range subs {
srv.sl.Remove(sub)
// Forward on unsubscribes if we are not
// a router ourselves.
if c.typ != ROUTER {
srv.broadcastUnSubscribe(sub)
}
}
}

View File

@@ -1,3 +1,5 @@
// Copyright 2012-2016 Apcera Inc. All rights reserved.
package server
import (
@@ -436,9 +438,8 @@ func TestClientAutoUnsubExactReceived(t *testing.T) {
<-ch
// We should not have any subscriptions in place here.
if c.subs.Count() != 0 {
t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n",
c.subs.Count())
if len(c.subs) != 0 {
t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n", len(c.subs))
}
}
@@ -466,9 +467,8 @@ func TestClientUnsubAfterAutoUnsub(t *testing.T) {
<-ch
// We should not have any subscriptions in place here.
if c.subs.Count() != 0 {
t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n",
c.subs.Count())
if len(c.subs) != 0 {
t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n", len(c.subs))
}
}

View File

@@ -13,8 +13,6 @@ import (
"strconv"
"sync/atomic"
"time"
"github.com/nats-io/gnatsd/sublist"
)
// Snapshot this
@@ -100,7 +98,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
case byCid:
pairs[i] = Pair{Key: client, Val: int64(client.cid)}
case bySubs:
pairs[i] = Pair{Key: client, Val: int64(client.subs.Count())}
pairs[i] = Pair{Key: client, Val: int64(len(client.subs))}
case byPending:
pairs[i] = Pair{Key: client, Val: int64(client.bw.Buffered())}
case byOutMsgs:
@@ -167,7 +165,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
ci.Idle = myUptime(c.Now.Sub(client.last))
ci.OutMsgs = client.outMsgs
ci.OutBytes = client.outBytes
ci.NumSubs = client.subs.Count()
ci.NumSubs = uint32(len(client.subs))
ci.Pending = client.bw.Buffered()
ci.Name = client.opts.Name
ci.Lang = client.opts.Lang
@@ -215,7 +213,11 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
}
if subs == 1 {
ci.Subs = castToSliceString(client.subs.All())
sublist := make([]*subscription, 0, len(client.subs))
for _, sub := range client.subs {
sublist = append(sublist, sub)
}
ci.Subs = castToSliceString(sublist)
}
client.mu.Unlock()
@@ -231,17 +233,17 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
ResponseHandler(w, r, b)
}
func castToSliceString(input []interface{}) []string {
func castToSliceString(input []*subscription) []string {
output := make([]string, 0, len(input))
for _, line := range input {
output = append(output, string(line.(*subscription).subject))
output = append(output, string(line.subject))
}
return output
}
// Subsz represents detail information on current connections.
type Subsz struct {
*sublist.Stats
*SublistStats
}
// Routez represents detailed information on current client connections.
@@ -292,11 +294,15 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
OutMsgs: r.outMsgs,
InBytes: r.inBytes,
OutBytes: r.outBytes,
NumSubs: r.subs.Count(),
NumSubs: uint32(len(r.subs)),
}
if subs == 1 {
ri.Subs = castToSliceString(r.subs.All())
sublist := make([]*subscription, 0, len(r.subs))
for _, sub := range r.subs {
sublist = append(sublist, sub)
}
ri.Subs = castToSliceString(sublist)
}
r.mu.Unlock()
@@ -325,7 +331,6 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) {
s.mu.Unlock()
st := &Subsz{s.sl.Stats()}
b, err := json.MarshalIndent(st, "", " ")
if err != nil {
Errorf("Error marshalling response to /subscriptionsz request: %v", err)

View File

@@ -235,14 +235,15 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
s.mu.Lock()
for _, client := range s.clients {
client.mu.Lock()
subs := client.subs.All()
subs := make([]*subscription, 0, len(client.subs))
for _, sub := range client.subs {
subs = append(subs, sub)
}
client.mu.Unlock()
for _, s := range subs {
if sub, ok := s.(*subscription); ok {
rsid := routeSid(sub)
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid)
b.WriteString(proto)
}
for _, sub := range subs {
rsid := routeSid(sub)
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid)
b.WriteString(proto)
}
}
s.mu.Unlock()
@@ -411,7 +412,7 @@ func (s *Server) routeSidQueueSubscriber(rsid []byte) (*subscription, bool) {
}
sid := matches[RSID_SID_INDEX]
if sub, ok := (client.subs.Get(sid)).(*subscription); ok {
if sub, ok := client.subs[string(sid)]; ok {
return sub, true
}
return nil, true

View File

@@ -19,8 +19,6 @@ import (
// Allow dynamic profiling.
_ "net/http/pprof"
"github.com/nats-io/gnatsd/sublist"
)
// Info is the information sent to clients to help them understand information
@@ -47,7 +45,7 @@ type Server struct {
mu sync.Mutex
info Info
infoJSON []byte
sl *sublist.Sublist
sl *Sublist
opts *Options
cAuth Auth
rAuth Auth
@@ -101,7 +99,7 @@ func New(opts *Options) *Server {
s := &Server{
info: info,
sl: sublist.New(),
sl: NewSublist(),
opts: opts,
debug: opts.Debug,
trace: opts.Trace,
@@ -697,9 +695,9 @@ func (s *Server) NumClients() int {
// NumSubscriptions will report how many subscriptions are active.
func (s *Server) NumSubscriptions() uint32 {
s.mu.Lock()
defer s.mu.Unlock()
stats := s.sl.Stats()
return stats.NumSubs
subs := s.sl.Count()
s.mu.Unlock()
return subs
}
// Addr will return the net.Addr object for the current listener.

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2015 Apcera Inc. All rights reserved.
// Copyright 2012-2016 Apcera Inc. All rights reserved.
package server
@@ -6,9 +6,6 @@ import (
"bytes"
"net"
"testing"
"github.com/nats-io/gnatsd/hashmap"
"github.com/nats-io/gnatsd/sublist"
)
func TestSplitBufferSubOp(t *testing.T) {
@@ -16,8 +13,8 @@ func TestSplitBufferSubOp(t *testing.T) {
defer cli.Close()
defer trash.Close()
s := &Server{sl: sublist.New()}
c := &client{srv: s, subs: hashmap.New(), nc: cli}
s := &Server{sl: NewSublist()}
c := &client{srv: s, subs: make(map[string]*subscription), nc: cli}
subop := []byte("SUB foo 1\r\n")
subop1 := subop[:6]
@@ -35,11 +32,11 @@ func TestSplitBufferSubOp(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.sl.Match([]byte("foo"))
r := s.sl.Match("foo")
if r == nil || len(r) != 1 {
t.Fatalf("Did not match subscription properly: %+v\n", r)
}
sub := r[0].(*subscription)
sub := r[0]
if !bytes.Equal(sub.subject, []byte("foo")) {
t.Fatalf("Subject did not match expected 'foo' : '%s'\n", sub.subject)
}
@@ -52,8 +49,8 @@ func TestSplitBufferSubOp(t *testing.T) {
}
func TestSplitBufferUnsubOp(t *testing.T) {
s := &Server{sl: sublist.New()}
c := &client{srv: s, subs: hashmap.New()}
s := &Server{sl: NewSublist()}
c := &client{srv: s, subs: make(map[string]*subscription)}
subop := []byte("SUB foo 1024\r\n")
if err := c.parse(subop); err != nil {
@@ -79,14 +76,14 @@ func TestSplitBufferUnsubOp(t *testing.T) {
if c.state != OP_START {
t.Fatalf("Expected OP_START state vs %d\n", c.state)
}
r := s.sl.Match([]byte("foo"))
r := s.sl.Match("foo")
if r != nil && len(r) != 0 {
t.Fatalf("Should be no subscriptions in results: %+v\n", r)
}
}
func TestSplitBufferPubOp(t *testing.T) {
c := &client{subs: hashmap.New()}
c := &client{subs: make(map[string]*subscription)}
pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r")
pub1 := pub[:2]
pub2 := pub[2:9]
@@ -152,7 +149,7 @@ func TestSplitBufferPubOp(t *testing.T) {
}
func TestSplitBufferPubOp2(t *testing.T) {
c := &client{subs: hashmap.New()}
c := &client{subs: make(map[string]*subscription)}
pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r\n")
pub1 := pub[:30]
pub2 := pub[30:]
@@ -172,7 +169,7 @@ func TestSplitBufferPubOp2(t *testing.T) {
}
func TestSplitBufferPubOp3(t *testing.T) {
c := &client{subs: hashmap.New()}
c := &client{subs: make(map[string]*subscription)}
pubAll := []byte("PUB foo bar 11\r\nhello world\r\n")
pub := pubAll[:16]
@@ -198,7 +195,7 @@ func TestSplitBufferPubOp3(t *testing.T) {
}
func TestSplitBufferPubOp4(t *testing.T) {
c := &client{subs: hashmap.New()}
c := &client{subs: make(map[string]*subscription)}
pubAll := []byte("PUB foo 11\r\nhello world\r\n")
pub := pubAll[:12]
@@ -224,7 +221,7 @@ func TestSplitBufferPubOp4(t *testing.T) {
}
func TestSplitBufferPubOp5(t *testing.T) {
c := &client{subs: hashmap.New()}
c := &client{subs: make(map[string]*subscription)}
pubAll := []byte("PUB foo 11\r\nhello world\r\n")
// Splits need to be on MSG_END now too, so make sure we check that.
@@ -243,7 +240,7 @@ func TestSplitBufferPubOp5(t *testing.T) {
}
func TestSplitConnectArg(t *testing.T) {
c := &client{subs: hashmap.New()}
c := &client{subs: make(map[string]*subscription)}
connectAll := []byte("CONNECT {\"verbose\":false,\"ssl_required\":false," +
"\"user\":\"test\",\"pedantic\":true,\"pass\":\"pass\"}\r\n")
@@ -291,7 +288,7 @@ func TestSplitConnectArg(t *testing.T) {
}
func TestSplitDanglingArgBuf(t *testing.T) {
c := &client{subs: hashmap.New()}
c := &client{subs: make(map[string]*subscription)}
// We test to make sure we do not dangle any argBufs after processing
// since that could lead to performance issues.
@@ -373,7 +370,7 @@ func TestSplitMsgArg(t *testing.T) {
}
func TestSplitBufferMsgOp(t *testing.T) {
c := &client{subs: hashmap.New(), typ: ROUTER}
c := &client{subs: make(map[string]*subscription), typ: ROUTER}
msg := []byte("MSG foo.bar QRSID:15:3 _INBOX.22 11\r\nhello world\r")
msg1 := msg[:2]
msg2 := msg[2:9]

532
server/sublist.go Normal file
View File

@@ -0,0 +1,532 @@
// Copyright 2016 Apcera Inc. All rights reserved.
// Package sublist is a routing mechanism to handle subject distribution
// and provides a facility to match subjects from published messages to
// interested subscribers. Subscribers can have wildcard subjects to match
// multiple published subjects.
package server
import (
"errors"
"strings"
"sync"
"sync/atomic"
)
// Common byte variables for wildcards and token separator.
const (
pwc = '*'
fwc = '>'
tsep = "."
btsep = '.'
)
// Sublist related errors
var (
ErrInvalidSubject = errors.New("sublist: Invalid Subject")
ErrNotFound = errors.New("sublist: No Matches Found")
)
// cacheMax is used to bound limit the frontend cache
const slCacheMax = 1024
// A Sublist stores and efficiently retrieves subscriptions.
type Sublist struct {
sync.RWMutex
matches uint64
cacheHits uint64
inserts uint64
removes uint64
cache map[string][]*subscription
root *level
count uint32
}
// A node contains subscriptions and a pointer to the next level.
type node struct {
next *level
subs []*subscription
}
// A level represents a group of nodes and special pointers to
// wildcard nodes.
type level struct {
nodes map[string]*node
pwc, fwc *node
}
// Create a new default node.
func newNode() *node {
return &node{subs: make([]*subscription, 0, 4)}
}
// Create a new default level. We use FNV1A as the hash
// algortihm for the tokens, which should be short.
func newLevel() *level {
return &level{nodes: make(map[string]*node)}
}
// New will create a default sublist
func NewSublist() *Sublist {
return &Sublist{root: newLevel(), cache: make(map[string][]*subscription)}
}
// Insert adds a subscription into the sublist
func (s *Sublist) Insert(sub *subscription) error {
// copy the subject since we hold this and this might be part of a large byte slice.
subject := string(append([]byte(nil), sub.subject...))
tsa := [32]string{}
tokens := tsa[:0]
start := 0
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tokens = append(tokens, subject[start:i])
start = i + 1
}
}
tokens = append(tokens, subject[start:])
s.Lock()
sfwc := false
l := s.root
var n *node
for _, t := range tokens {
if len(t) == 0 || sfwc {
s.Unlock()
return ErrInvalidSubject
}
switch t[0] {
case pwc:
n = l.pwc
case fwc:
n = l.fwc
sfwc = true
default:
n = l.nodes[t]
}
if n == nil {
n = newNode()
switch t[0] {
case pwc:
l.pwc = n
case fwc:
l.fwc = n
default:
l.nodes[t] = n
}
}
if n.next == nil {
n.next = newLevel()
}
l = n.next
}
n.subs = append(n.subs, sub)
s.count++
s.inserts++
s.addToCache(subject, sub)
s.Unlock()
return nil
}
// addToCache will add the new entry to existing cache
// entries if needed. Assumes write lock is held.
func (s *Sublist) addToCache(subject string, sub *subscription) {
for k, results := range s.cache {
if matchLiteral(k, subject) {
// Copy since others may have a reference.
nr := make([]*subscription, len(results), len(results)+1)
copy(nr, results)
s.cache[k] = append(nr, sub)
}
}
}
// removeFromCache will remove the sub from any active cache entries.
// Assumes write lock is held.
func (s *Sublist) removeFromCache(subject string, sub *subscription) {
for k, _ := range s.cache {
if !matchLiteral(k, subject) {
continue
}
// Since someone else may be referecing, can't modify the list
// safely, just let it re-populate.
delete(s.cache, k)
}
}
// Match will match all entries to the literal subject.
// It will return a slice of results.
// Note that queue subscribers will only have one member selected
// and returned for each queue group.
func (s *Sublist) Match(subject string) []*subscription {
s.RLock()
atomic.AddUint64(&s.matches, 1)
rc, ok := s.cache[subject]
s.RUnlock()
if ok {
atomic.AddUint64(&s.cacheHits, 1)
return rc
}
tsa := [32]string{}
tokens := tsa[:0]
start := 0
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tokens = append(tokens, subject[start:i])
start = i + 1
}
}
tokens = append(tokens, subject[start:])
// FIXME(dlc) - Make pool?
results := []*subscription{}
s.RLock()
results = matchLevel(s.root, tokens, results)
s.RUnlock()
s.Lock()
// Add to our cache
s.cache[subject] = results
// Bound the number of entries to sublistMaxCache
if len(s.cache) > slCacheMax {
for k, _ := range s.cache {
delete(s.cache, k)
break
}
}
s.Unlock()
return results
}
// matchLevel is used to recursively descend into the trie.
func matchLevel(l *level, toks []string, results []*subscription) []*subscription {
var pwc, n *node
for i, t := range toks {
if l == nil {
return results
}
if l.fwc != nil {
results = append(results, l.fwc.subs...)
}
if pwc = l.pwc; pwc != nil {
results = matchLevel(pwc.next, toks[i+1:], results)
}
n = l.nodes[t]
if n != nil {
l = n.next
} else {
l = nil
}
}
if n != nil {
results = append(results, n.subs...)
}
if pwc != nil {
results = append(results, pwc.subs...)
}
return results
}
// lnt is used to track descent into levels for a removal for pruning.
type lnt struct {
l *level
n *node
t string
}
// Remove will remove a subscription.
func (s *Sublist) Remove(sub *subscription) error {
subject := string(sub.subject)
tsa := [32]string{}
tokens := tsa[:0]
start := 0
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tokens = append(tokens, subject[start:i])
start = i + 1
}
}
tokens = append(tokens, subject[start:])
s.Lock()
defer s.Unlock()
sfwc := false
l := s.root
var n *node
// Track levels for pruning
var lnts [32]lnt
levels := lnts[:0]
for _, t := range tokens {
if len(t) == 0 || sfwc {
return ErrInvalidSubject
}
if l == nil {
return ErrNotFound
}
switch t[0] {
case pwc:
n = l.pwc
case fwc:
n = l.fwc
sfwc = true
default:
n = l.nodes[t]
}
if n != nil {
levels = append(levels, lnt{l, n, t})
l = n.next
} else {
l = nil
}
}
if !s.removeFromNode(n, sub) {
return ErrNotFound
}
s.count--
s.removes++
for i := len(levels) - 1; i >= 0; i-- {
l, n, t := levels[i].l, levels[i].n, levels[i].t
if n.isEmpty() {
l.pruneNode(n, t)
}
}
s.removeFromCache(subject, sub)
return nil
}
// pruneNode is used to prune an empty node from the tree.
func (l *level) pruneNode(n *node, t string) {
if n == nil {
return
}
if n == l.fwc {
l.fwc = nil
} else if n == l.pwc {
l.pwc = nil
} else {
delete(l.nodes, t)
}
}
// isEmpty will test if the node has any entries. Used
// in pruning.
func (n *node) isEmpty() bool {
if len(n.subs) == 0 {
if n.next == nil || n.next.numNodes() == 0 {
return true
}
}
return false
}
// Return the number of nodes for the given level.
func (l *level) numNodes() int {
num := len(l.nodes)
if l.pwc != nil {
num++
}
if l.fwc != nil {
num++
}
return num
}
// Remove the sub for the given node.
func (s *Sublist) removeFromNode(n *node, sub *subscription) bool {
if n == nil {
return false
}
sl := n.subs
for i := 0; i < len(sl); i++ {
if sl[i] == sub {
sl[i] = sl[len(sl)-1]
sl[len(sl)-1] = nil
sl = sl[:len(sl)-1]
n.subs = shrinkAsNeeded(sl)
return true
}
}
return false
}
// Checks if we need to do a resize. This is for very large growth then
// subsequent return to a more normal size from unsubscribe.
func shrinkAsNeeded(sl []*subscription) []*subscription {
lsl := len(sl)
csl := cap(sl)
// Don't bother if list not too big
if csl <= 8 {
return sl
}
pFree := float32(csl-lsl) / float32(csl)
if pFree > 0.50 {
return append([]*subscription(nil), sl...)
}
return sl
}
// Count returns the number of subscriptions.
func (s *Sublist) Count() uint32 {
s.RLock()
defer s.RUnlock()
return s.count
}
// CacheCount returns the number of result sets in the cache.
func (s *Sublist) CacheCount() int {
s.RLock()
defer s.RUnlock()
return len(s.cache)
}
// Public stats for the sublist
type SublistStats struct {
NumSubs uint32 `json:"num_subscriptions"`
NumCache uint32 `json:"num_cache"`
NumInserts uint64 `json:"num_inserts"`
NumRemoves uint64 `json:"num_removes"`
NumMatches uint64 `json:"num_matches"`
CacheHitRate float64 `json:"cache_hit_rate"`
MaxFanout uint32 `json:"max_fanout"`
AvgFanout float64 `json:"avg_fanout"`
}
// Stats will return a stats structure for the current state.
func (s *Sublist) Stats() *SublistStats {
s.Lock()
defer s.Unlock()
st := &SublistStats{}
st.NumSubs = s.count
st.NumCache = uint32(len(s.cache))
st.NumInserts = s.inserts
st.NumRemoves = s.removes
st.NumMatches = s.matches
if s.matches > 0 {
st.CacheHitRate = float64(s.cacheHits) / float64(s.matches)
}
// whip through cache for fanout stats
tot, max := 0, 0
for _, results := range s.cache {
l := len(results)
tot += l
if l > max {
max = l
}
}
st.MaxFanout = uint32(max)
if tot > 0 {
st.AvgFanout = float64(tot) / float64(len(s.cache))
}
return st
}
// numLevels will return the maximum number of levels
// contained in the Sublist tree.
func (s *Sublist) numLevels() int {
return visitLevel(s.root, 0)
}
// visitLevel is used to descend the Sublist tree structure
// recursively.
func visitLevel(l *level, depth int) int {
if l == nil || l.numNodes() == 0 {
return depth
}
depth++
maxDepth := depth
for _, n := range l.nodes {
if n == nil {
continue
}
newDepth := visitLevel(n.next, depth)
if newDepth > maxDepth {
maxDepth = newDepth
}
}
if l.pwc != nil {
pwcDepth := visitLevel(l.pwc.next, depth)
if pwcDepth > maxDepth {
maxDepth = pwcDepth
}
}
if l.fwc != nil {
fwcDepth := visitLevel(l.fwc.next, depth)
if fwcDepth > maxDepth {
maxDepth = fwcDepth
}
}
return maxDepth
}
// IsValidLiteralSubject returns true if a subject is valid, false otherwise
func IsValidLiteralSubject(subject string) bool {
tokens := strings.Split(string(subject), tsep)
for _, t := range tokens {
if len(t) == 0 {
return false
}
if len(t) > 1 {
continue
}
switch t[0] {
case pwc, fwc:
return false
}
}
return true
}
// matchLiteral is used to test literal subjects, those that do not have any
// wildcards, with a target subject. This is used in the cache layer.
func matchLiteral(literal, subject string) bool {
li := 0
ll := len(literal)
for i := 0; i < len(subject); i++ {
if li >= ll {
return false
}
b := subject[i]
switch b {
case pwc:
// Skip token in literal
ll := len(literal)
for {
if li >= ll || literal[li] == btsep {
li--
break
}
li++
}
case fwc:
return true
default:
if b != literal[li] {
return false
}
}
li++
}
// Make sure we have processed all of the literal's chars..
if li < ll {
return false
}
return true
}

457
server/sublist_test.go Normal file
View File

@@ -0,0 +1,457 @@
package server
import (
"fmt"
"runtime"
"strings"
"sync"
"testing"
"time"
dbg "runtime/debug"
)
func stackFatalf(t *testing.T, f string, args ...interface{}) {
lines := make([]string, 0, 32)
msg := fmt.Sprintf(f, args...)
lines = append(lines, msg)
// Generate the Stack of callers: Skip us and verify* frames.
for i := 2; true; i++ {
_, file, line, ok := runtime.Caller(i)
if !ok {
break
}
msg := fmt.Sprintf("%d - %s:%d", i, file, line)
lines = append(lines, msg)
}
t.Fatalf("%s", strings.Join(lines, "\n"))
}
func verifyCount(s *Sublist, count uint32, t *testing.T) {
if s.Count() != count {
stackFatalf(t, "Count is %d, should be %d", s.Count(), count)
}
}
func verifyLen(r []*subscription, l int, t *testing.T) {
if r == nil || len(r) != l {
stackFatalf(t, "Results len is %d, should be %d", len(r), l)
}
}
func verifyNumLevels(s *Sublist, expected int, t *testing.T) {
dl := s.numLevels()
if dl != expected {
stackFatalf(t, "NumLevels is %d, should be %d", dl, expected)
}
}
func verifyMember(r []*subscription, val *subscription, t *testing.T) {
for _, v := range r {
if v == nil {
continue
}
if v == val {
return
}
}
stackFatalf(t, "Value '%+v' not found in results", val)
}
// Helper to generate test subscriptions.
func newSub(subject string) *subscription {
return &subscription{subject: []byte(subject)}
}
func TestSublistInit(t *testing.T) {
s := NewSublist()
verifyCount(s, 0, t)
}
func TestSublistInsertCount(t *testing.T) {
s := NewSublist()
s.Insert(newSub("foo"))
s.Insert(newSub("bar"))
s.Insert(newSub("foo.bar"))
verifyCount(s, 3, t)
}
func TestSublistSimple(t *testing.T) {
s := NewSublist()
subject := "foo"
sub := newSub(subject)
s.Insert(sub)
r := s.Match(subject)
verifyLen(r, 1, t)
verifyMember(r, sub, t)
}
func TestSublistSimpleMultiTokens(t *testing.T) {
s := NewSublist()
subject := "foo.bar.baz"
sub := newSub(subject)
s.Insert(sub)
r := s.Match(subject)
verifyLen(r, 1, t)
verifyMember(r, sub, t)
}
func TestSublistPartialWildcard(t *testing.T) {
s := NewSublist()
lsub := newSub("a.b.c")
psub := newSub("a.*.c")
s.Insert(lsub)
s.Insert(psub)
r := s.Match("a.b.c")
verifyLen(r, 2, t)
verifyMember(r, lsub, t)
verifyMember(r, psub, t)
}
func TestSublistPartialWildcardAtEnd(t *testing.T) {
s := NewSublist()
lsub := newSub("a.b.c")
psub := newSub("a.b.*")
s.Insert(lsub)
s.Insert(psub)
r := s.Match("a.b.c")
verifyLen(r, 2, t)
verifyMember(r, lsub, t)
verifyMember(r, psub, t)
}
func TestSublistFullWildcard(t *testing.T) {
s := NewSublist()
lsub := newSub("a.b.c")
fsub := newSub("a.>")
s.Insert(lsub)
s.Insert(fsub)
r := s.Match("a.b.c")
verifyLen(r, 2, t)
verifyMember(r, lsub, t)
verifyMember(r, fsub, t)
}
func TestSublistRemove(t *testing.T) {
s := NewSublist()
subject := "a.b.c.d"
sub := newSub(subject)
s.Insert(sub)
verifyCount(s, 1, t)
r := s.Match(subject)
verifyLen(r, 1, t)
s.Remove(newSub("a.b.c"))
verifyCount(s, 1, t)
s.Remove(sub)
verifyCount(s, 0, t)
r = s.Match(subject)
verifyLen(r, 0, t)
}
func TestSublistRemoveWildcard(t *testing.T) {
s := NewSublist()
subject := "a.b.c.d"
sub := newSub(subject)
psub := newSub("a.b.*.d")
fsub := newSub("a.b.>")
s.Insert(sub)
s.Insert(psub)
s.Insert(fsub)
verifyCount(s, 3, t)
r := s.Match(subject)
verifyLen(r, 3, t)
s.Remove(sub)
verifyCount(s, 2, t)
s.Remove(fsub)
verifyCount(s, 1, t)
s.Remove(psub)
verifyCount(s, 0, t)
r = s.Match(subject)
verifyLen(r, 0, t)
}
func TestSublistRemoveCleanup(t *testing.T) {
s := NewSublist()
literal := "a.b.c.d.e.f"
depth := len(strings.Split(literal, tsep))
sub := newSub(literal)
verifyNumLevels(s, 0, t)
s.Insert(sub)
verifyNumLevels(s, depth, t)
s.Remove(sub)
verifyNumLevels(s, 0, t)
}
func TestSublistRemoveCleanupWildcards(t *testing.T) {
s := NewSublist()
subject := "a.b.*.d.e.>"
depth := len(strings.Split(subject, tsep))
sub := newSub(subject)
verifyNumLevels(s, 0, t)
s.Insert(sub)
verifyNumLevels(s, depth, t)
s.Remove(sub)
verifyNumLevels(s, 0, t)
}
func TestSublistInvalidSubjectsInsert(t *testing.T) {
s := NewSublist()
// Insert, or subscribtions, can have wildcards, but not empty tokens,
// and can not have a FWC that is not the terminal token.
// beginning empty token
if err := s.Insert(newSub(".foo")); err != ErrInvalidSubject {
t.Fatal("Expected invalid subject error")
}
// trailing empty token
if err := s.Insert(newSub("foo.")); err != ErrInvalidSubject {
t.Fatal("Expected invalid subject error")
}
// empty middle token
if err := s.Insert(newSub("foo..bar")); err != ErrInvalidSubject {
t.Fatal("Expected invalid subject error")
}
// empty middle token #2
if err := s.Insert(newSub("foo.bar..baz")); err != ErrInvalidSubject {
t.Fatal("Expected invalid subject error")
}
// fwc not terminal
if err := s.Insert(newSub("foo.>.bar")); err != ErrInvalidSubject {
t.Fatal("Expected invalid subject error")
}
}
func TestSublistCache(t *testing.T) {
s := NewSublist()
// Test add a remove logistics
subject := "a.b.c.d"
sub := newSub(subject)
psub := newSub("a.b.*.d")
fsub := newSub("a.b.>")
s.Insert(sub)
r := s.Match(subject)
verifyLen(r, 1, t)
s.Insert(psub)
s.Insert(fsub)
verifyCount(s, 3, t)
r = s.Match(subject)
verifyLen(r, 3, t)
s.Remove(sub)
verifyCount(s, 2, t)
s.Remove(fsub)
verifyCount(s, 1, t)
s.Remove(psub)
verifyCount(s, 0, t)
// Check that cache is now empty
if cc := s.CacheCount(); cc != 0 {
t.Fatalf("Cache should be zero, got %d\n", cc)
}
r = s.Match(subject)
verifyLen(r, 0, t)
for i := 0; i < 2*slCacheMax; i++ {
s.Match(fmt.Sprintf("foo-%d\n", i))
}
if cc := s.CacheCount(); cc > slCacheMax {
t.Fatalf("Cache should be constrained by cacheMax, got %d for current count\n", cc)
}
}
func checkBool(b, expected bool, t *testing.T) {
if b != expected {
dbg.PrintStack()
t.Fatalf("Expected %v, but got %v\n", expected, b)
}
}
func TestSublistValidLiteralSubjects(t *testing.T) {
checkBool(IsValidLiteralSubject("foo"), true, t)
checkBool(IsValidLiteralSubject(".foo"), false, t)
checkBool(IsValidLiteralSubject("foo."), false, t)
checkBool(IsValidLiteralSubject("foo..bar"), false, t)
checkBool(IsValidLiteralSubject("foo.bar.*"), false, t)
checkBool(IsValidLiteralSubject("foo.bar.>"), false, t)
checkBool(IsValidLiteralSubject("*"), false, t)
checkBool(IsValidLiteralSubject(">"), false, t)
}
func TestSublistMatchLiterals(t *testing.T) {
checkBool(matchLiteral("foo", "foo"), true, t)
checkBool(matchLiteral("foo", "bar"), false, t)
checkBool(matchLiteral("foo", "*"), true, t)
checkBool(matchLiteral("foo", ">"), true, t)
checkBool(matchLiteral("foo.bar", ">"), true, t)
checkBool(matchLiteral("foo.bar", "foo.>"), true, t)
checkBool(matchLiteral("foo.bar", "bar.>"), false, t)
checkBool(matchLiteral("stats.test.22", "stats.>"), true, t)
checkBool(matchLiteral("stats.test.22", "stats.*.*"), true, t)
checkBool(matchLiteral("foo.bar", "foo"), false, t)
checkBool(matchLiteral("stats.test.foos", "stats.test.foos"), true, t)
checkBool(matchLiteral("stats.test.foos", "stats.test.foo"), false, t)
}
func TestSublistBadSubjectOnRemove(t *testing.T) {
bad := "a.b..d"
sub := newSub(bad)
s := NewSublist()
if err := s.Insert(sub); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject, got %v\n", err)
}
if err := s.Remove(sub); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject, got %v\n", err)
}
badfwc := "a.>.b"
if err := s.Remove(newSub(badfwc)); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject, got %v\n", err)
}
}
// This is from bug report #18
func TestSublistTwoTokenPubMatchSingleTokenSub(t *testing.T) {
s := NewSublist()
sub := newSub("foo")
s.Insert(sub)
r := s.Match("foo")
verifyLen(r, 1, t)
verifyMember(r, sub, t)
r = s.Match("foo.bar")
verifyLen(r, 0, t)
}
// -- Benchmarks Setup --
var subs []*subscription
var toks = []string{"apcera", "continuum", "component", "router", "api", "imgr", "jmgr", "auth"}
var sl = NewSublist()
var results = make([]*subscription, 0, 64)
func init() {
subs = make([]*subscription, 0, 256*1024)
subsInit("")
for i := 0; i < len(subs); i++ {
sl.Insert(subs[i])
}
addWildcards()
}
func subsInit(pre string) {
var sub string
for _, t := range toks {
if len(pre) > 0 {
sub = pre + tsep + t
} else {
sub = t
}
subs = append(subs, newSub(sub))
if len(strings.Split(sub, tsep)) < 5 {
subsInit(sub)
}
}
}
func addWildcards() {
sl.Insert(newSub("cloud.>"))
sl.Insert(newSub("cloud.continuum.component.>"))
sl.Insert(newSub("cloud.*.*.router.*"))
}
// -- Benchmarks Setup End --
func Benchmark______________________SublistInsert(b *testing.B) {
s := NewSublist()
for i, l := 0, len(subs); i < b.N; i++ {
index := i % l
s.Insert(subs[index])
}
}
func Benchmark____________SublistMatchSingleToken(b *testing.B) {
for i := 0; i < b.N; i++ {
sl.Match("apcera")
}
}
func Benchmark______________SublistMatchTwoTokens(b *testing.B) {
for i := 0; i < b.N; i++ {
sl.Match("apcera.continuum")
}
}
func Benchmark____________SublistMatchThreeTokens(b *testing.B) {
for i := 0; i < b.N; i++ {
sl.Match("apcera.continuum.component")
}
}
func Benchmark_____________SublistMatchFourTokens(b *testing.B) {
for i := 0; i < b.N; i++ {
sl.Match("apcera.continuum.component.router")
}
}
func Benchmark_SublistMatchFourTokensSingleResult(b *testing.B) {
for i := 0; i < b.N; i++ {
sl.Match("apcera.continuum.component.router")
}
}
func Benchmark_SublistMatchFourTokensMultiResults(b *testing.B) {
for i := 0; i < b.N; i++ {
sl.Match("cloud.continuum.component.router")
}
}
func Benchmark_______SublistMissOnLastTokenOfFive(b *testing.B) {
for i := 0; i < b.N; i++ {
sl.Match("apcera.continuum.component.router.ZZZZ")
}
}
func multiRead(b *testing.B, num int) {
b.StopTimer()
var swg, fwg sync.WaitGroup
swg.Add(num)
fwg.Add(num)
s := "apcera.continuum.component.router"
for i := 0; i < num; i++ {
go func() {
swg.Done()
swg.Wait()
for i := 0; i < b.N; i++ {
sl.Match(s)
}
fwg.Done()
}()
}
swg.Wait()
b.StartTimer()
fwg.Wait()
}
func Benchmark_____________Sublist10XMultipleReads(b *testing.B) {
multiRead(b, 10)
}
func Benchmark____________Sublist100XMultipleReads(b *testing.B) {
multiRead(b, 100)
}
func _BenchmarkRSS(b *testing.B) {
runtime.GC()
var m runtime.MemStats
runtime.ReadMemStats(&m)
println("HEAP:", m.HeapObjects)
println("ALLOC:", m.Alloc)
println("TOTAL ALLOC:", m.TotalAlloc)
time.Sleep(30 * 1e9)
}

View File

@@ -1,555 +0,0 @@
// Copyright 2012-2016 Apcera Inc. All rights reserved.
// Package sublist handles subject distribution and provides a facility
// match subjects to interested subscribers. Subscribers can have wildcard
// subjects to match multiple published subjects.
package sublist
import (
"errors"
"sync"
"sync/atomic"
"time"
"github.com/nats-io/gnatsd/hashmap"
)
// A Sublist stores and efficiently retrieves subscriptions. It uses a
// tree structure and an efficient RR cache to achieve quick lookups.
type Sublist struct {
mu sync.RWMutex
root *level
count uint32
cache *hashmap.HashMap
cmax int
stats stats
}
// Use padding for better cache alignement when multiple goroutines
// updating the stats.
type stats struct {
_ [8]int64
matches uint64
_ [7]int64
cacheHits uint64
_ [7]int64
inserts uint64
_ [7]int64
removes uint64
_ [7]int64
since time.Time
}
// A node contains subscriptions and a pointer to the next level.
type node struct {
next *level
subs []interface{}
}
// A level represents a group of nodes and special pointers to
// wildcard nodes.
type level struct {
nodes *hashmap.HashMap
pwc, fwc *node
}
// Create a new default node.
func newNode() *node {
return &node{subs: make([]interface{}, 0, 4)}
}
// Create a new default level. We use FNV1A as the hash
// algortihm for the tokens, which should be short.
func newLevel() *level {
h := hashmap.New()
return &level{nodes: h}
}
// defaultCacheMax is used to bound limit the frontend cache
const defaultCacheMax = 1024
// New will create a default sublist
func New() *Sublist {
return &Sublist{
root: newLevel(),
cache: hashmap.New(),
cmax: defaultCacheMax,
stats: stats{since: time.Now()},
}
}
// Common byte variables for wildcards and token separator.
const (
_PWC = byte('*')
_FWC = byte('>')
_SEP = byte('.')
)
// Sublist related errors
var (
ErrInvalidSubject = errors.New("Invalid Subject")
ErrNotFound = errors.New("No Matches Found")
)
// split will split a subject into tokens
func split(subject []byte, tokens [][]byte) [][]byte {
start := 0
for i, b := range subject {
if b == _SEP {
tokens = append(tokens, subject[start:i])
start = i + 1
}
}
return append(tokens, subject[start:])
}
// Insert adds a subject into the sublist
func (s *Sublist) Insert(subject []byte, sub interface{}) error {
tsa := [16][]byte{}
toks := split(subject, tsa[:0])
s.mu.Lock()
sfwc := false
l := s.root
var n *node
for _, t := range toks {
if len(t) == 0 || sfwc {
s.mu.Unlock()
return ErrInvalidSubject
}
switch t[0] {
case _PWC:
n = l.pwc
case _FWC:
n = l.fwc
sfwc = true
default:
if v := l.nodes.Get(t); v == nil {
n = nil
} else {
n = v.(*node)
}
}
if n == nil {
n = newNode()
switch t[0] {
case _PWC:
l.pwc = n
case _FWC:
l.fwc = n
default:
l.nodes.Set(t, n)
}
}
if n.next == nil {
n.next = newLevel()
}
l = n.next
}
n.subs = append(n.subs, sub)
s.count++
s.stats.inserts++
s.addToCache(subject, sub)
s.mu.Unlock()
return nil
}
// addToCache will add the new entry to existing cache
// entries if needed. Assumes write lock is held.
func (s *Sublist) addToCache(subject []byte, sub interface{}) {
if s.cache.Count() == 0 {
return
}
// FIXME(dlc) avoid allocation?
all := s.cache.AllKeys()
for _, k := range all {
if !matchLiteral(k, subject) {
continue
}
r := s.cache.Get(k)
if r == nil {
continue
}
res := r.([]interface{})
res = append(res, sub)
s.cache.Set(k, res)
}
}
// removeFromCache will remove the sub from any active cache entries.
// Assumes write lock is held.
func (s *Sublist) removeFromCache(subject []byte, sub interface{}) {
if s.cache.Count() == 0 {
return
}
all := s.cache.AllKeys()
for _, k := range all {
if !matchLiteral(k, subject) {
continue
}
// FIXME(dlc), right now just remove all matching cache
// entries. This could be smarter and walk small result
// lists and delete the individual sub.
s.cache.Remove(k)
}
}
// Match will match all entries to the literal subject. It will return a
// slice of results.
func (s *Sublist) Match(subject []byte) []interface{} {
// Fastpath match on cache
s.mu.RLock()
atomic.AddUint64(&s.stats.matches, 1)
r := s.cache.Get(subject)
s.mu.RUnlock()
if r != nil {
atomic.AddUint64(&s.stats.cacheHits, 1)
return r.([]interface{})
}
// Cache miss
// Process subject into tokens, this is performed
// unlocked, so can be parallel.
tsa := [32][]byte{}
toks := tsa[:0]
start := 0
for i, b := range subject {
if b == _SEP {
toks = append(toks, subject[start:i])
start = i + 1
}
}
toks = append(toks, subject[start:])
results := make([]interface{}, 0, 4)
// Lock the sublist and lookup and add entry to cache.
s.mu.Lock()
matchLevel(s.root, toks, &results)
// We use random eviction to bound the size of the cache.
// RR is used for speed purposes here.
if int(s.cache.Count()) >= s.cmax {
s.cache.RemoveRandom()
}
// Make sure we copy the subject key here
scopy := make([]byte, len(subject))
copy(scopy, subject)
s.cache.Set(scopy, results)
s.mu.Unlock()
return results
}
// matchLevel is used to recursively descend into the trie when there
// is a cache miss.
func matchLevel(l *level, toks [][]byte, results *[]interface{}) {
var pwc, n *node
for i, t := range toks {
if l == nil {
return
}
if l.fwc != nil {
*results = append(*results, l.fwc.subs...)
}
if pwc = l.pwc; pwc != nil {
matchLevel(pwc.next, toks[i+1:], results)
}
if v := l.nodes.Get(t); v == nil {
n = nil
} else {
n = v.(*node)
}
if n != nil {
l = n.next
} else {
l = nil
}
}
if n != nil {
*results = append(*results, n.subs...)
}
if pwc != nil {
*results = append(*results, pwc.subs...)
}
return
}
// lnt is used to track descent into a removal for pruning.
type lnt struct {
l *level
n *node
t []byte
}
// Remove will remove any item associated with key. It will track descent
// into the trie and prune upon successful removal.
func (s *Sublist) Remove(subject []byte, sub interface{}) error {
tsa := [16][]byte{}
toks := split(subject, tsa[:0])
s.mu.Lock()
sfwc := false
l := s.root
var n *node
var lnts [32]lnt
levels := lnts[:0]
for _, t := range toks {
if len(t) == 0 || sfwc {
s.mu.Unlock()
return ErrInvalidSubject
}
if l == nil {
s.mu.Unlock()
return ErrNotFound
}
switch t[0] {
case _PWC:
n = l.pwc
case _FWC:
n = l.fwc
sfwc = true
default:
if v := l.nodes.Get(t); v == nil {
n = nil
} else {
n = v.(*node)
}
}
if n != nil {
levels = append(levels, lnt{l, n, t})
l = n.next
} else {
l = nil
}
}
if !s.removeFromNode(n, sub) {
s.mu.Unlock()
return ErrNotFound
}
s.count--
s.stats.removes++
for i := len(levels) - 1; i >= 0; i-- {
l, n, t := levels[i].l, levels[i].n, levels[i].t
if n.isEmpty() {
l.pruneNode(n, t)
}
}
s.removeFromCache(subject, sub)
s.mu.Unlock()
return nil
}
// pruneNode is used to prune and empty node from the tree.
func (l *level) pruneNode(n *node, t []byte) {
if n == nil {
return
}
if n == l.fwc {
l.fwc = nil
} else if n == l.pwc {
l.pwc = nil
} else {
l.nodes.Remove(t)
}
}
// isEmpty will test if the node has any entries. Used
// in pruning.
func (n *node) isEmpty() bool {
if len(n.subs) == 0 {
if n.next == nil || n.next.numNodes() == 0 {
return true
}
}
return false
}
// Return the number of nodes for the given level.
func (l *level) numNodes() uint32 {
num := l.nodes.Count()
if l.pwc != nil {
num++
}
if l.fwc != nil {
num++
}
return num
}
// Remove the sub for the given node.
func (s *Sublist) removeFromNode(n *node, sub interface{}) bool {
if n == nil {
return false
}
for i, v := range n.subs {
if v == sub {
num := len(n.subs)
a := n.subs
copy(a[i:num-1], a[i+1:num])
n.subs = a[0 : num-1]
return true
}
}
return false
}
// matchLiteral is used to test literal subjects, those that do not have any
// wildcards, with a target subject. This is used in the cache layer.
func matchLiteral(literal, subject []byte) bool {
li := 0
ll := len(literal)
for _, b := range subject {
if li >= ll {
return false
}
switch b {
case _PWC:
// Skip token in literal
ll := len(literal)
for {
if li >= ll || literal[li] == _SEP {
li--
break
}
li++
}
case _FWC:
return true
default:
if b != literal[li] {
return false
}
}
li++
}
// Make sure we have processed all of the literal's chars..
if li < ll {
return false
}
return true
}
// IsValidLiteralSubject returns true if a subject is valid, false otherwise
func IsValidLiteralSubject(subject []byte) bool {
tsa := [16][]byte{}
toks := split(subject, tsa[:0])
for _, t := range toks {
if len(t) == 0 {
return false
}
if len(t) > 1 {
continue
}
switch t[0] {
case _PWC, _FWC:
return false
}
}
return true
}
// Count return the number of stored items in the HashMap.
func (s *Sublist) Count() uint32 { return s.count }
// Stats for the sublist
type Stats struct {
NumSubs uint32 `json:"num_subscriptions"`
NumCache uint32 `json:"num_cache"`
NumInserts uint64 `json:"num_inserts"`
NumRemoves uint64 `json:"num_removes"`
NumMatches uint64 `json:"num_matches"`
CacheHitRate float64 `json:"cache_hit_rate"`
MaxFanout uint32 `json:"max_fanout"`
AvgFanout float64 `json:"avg_fanout"`
StatsTime time.Time `json:"stats_time"`
}
// Stats will return a stats structure for the current state.
func (s *Sublist) Stats() *Stats {
s.mu.Lock()
defer s.mu.Unlock()
st := &Stats{}
st.NumSubs = s.count
st.NumCache = s.cache.Count()
st.NumInserts = s.stats.inserts
st.NumRemoves = s.stats.removes
st.NumMatches = s.stats.matches
if s.stats.matches > 0 {
st.CacheHitRate = float64(s.stats.cacheHits) / float64(s.stats.matches)
}
// whip through cache for fanout stats
// FIXME, creating all each time could be expensive, should do a cb version.
tot, max := 0, 0
all := s.cache.All()
for _, r := range all {
l := len(r.([]interface{}))
tot += l
if l > max {
max = l
}
}
st.MaxFanout = uint32(max)
if tot > 0 {
st.AvgFanout = float64(tot) / float64(len(all))
}
st.StatsTime = s.stats.since
return st
}
// ResetStats will clear stats and update StatsTime to time.Now()
func (s *Sublist) ResetStats() {
s.stats = stats{}
s.stats.since = time.Now()
}
// numLevels will return the maximum number of levels
// contained in the Sublist tree.
func (s *Sublist) numLevels() int {
return visitLevel(s.root, 0)
}
// visitLevel is used to descend the Sublist tree structure
// recursively.
func visitLevel(l *level, depth int) int {
if l == nil || l.numNodes() == 0 {
return depth
}
depth++
maxDepth := depth
all := l.nodes.All()
for _, a := range all {
if a == nil {
continue
}
n := a.(*node)
newDepth := visitLevel(n.next, depth)
if newDepth > maxDepth {
maxDepth = newDepth
}
}
if l.pwc != nil {
pwcDepth := visitLevel(l.pwc.next, depth)
if pwcDepth > maxDepth {
maxDepth = pwcDepth
}
}
if l.fwc != nil {
fwcDepth := visitLevel(l.fwc.next, depth)
if fwcDepth > maxDepth {
maxDepth = fwcDepth
}
}
return maxDepth
}

View File

@@ -1,529 +0,0 @@
package sublist
import (
"bytes"
"fmt"
"runtime"
"runtime/debug"
"strings"
"sync"
"testing"
"time"
)
func verifyCount(s *Sublist, count uint32, t *testing.T) {
if s.Count() != count {
t.Errorf("Count is %d, should be %d", s.Count(), count)
}
}
func verifyLen(r []interface{}, l int, t *testing.T) {
if len(r) != l {
t.Errorf("Results len is %d, should be %d", len(r), l)
}
}
func verifyMember(r []interface{}, val string, t *testing.T) {
for _, v := range r {
if v == nil {
continue
}
if v.(string) == val {
return
}
}
t.Errorf("Value '%s' not found in results", val)
}
func verifyNumLevels(s *Sublist, expected int, t *testing.T) {
dl := s.numLevels()
if dl != expected {
t.Errorf("NumLevels is %d, should be %d", dl, expected)
}
}
func TestInit(t *testing.T) {
s := New()
verifyCount(s, 0, t)
}
func TestInsertCount(t *testing.T) {
s := New()
s.Insert([]byte("foo"), "a")
s.Insert([]byte("bar"), "b")
s.Insert([]byte("foo.bar"), "b")
verifyCount(s, 3, t)
}
func TestSimple(t *testing.T) {
s := New()
val := "a"
sub := []byte("foo")
s.Insert(sub, val)
r := s.Match(sub)
verifyLen(r, 1, t)
verifyMember(r, val, t)
}
func TestSimpleMultiTokens(t *testing.T) {
s := New()
val := "a"
sub := []byte("foo.bar.baz")
s.Insert(sub, val)
r := s.Match(sub)
verifyLen(r, 1, t)
verifyMember(r, val, t)
}
func TestPartialWildcard(t *testing.T) {
s := New()
literal := []byte("a.b.c")
pwc := []byte("a.*.c")
a, b := "a", "b"
s.Insert(literal, a)
s.Insert(pwc, b)
r := s.Match(literal)
verifyLen(r, 2, t)
verifyMember(r, a, t)
verifyMember(r, b, t)
}
func TestPartialWildcardAtEnd(t *testing.T) {
s := New()
literal := []byte("a.b.c")
pwc := []byte("a.b.*")
a, b := "a", "b"
s.Insert(literal, a)
s.Insert(pwc, b)
r := s.Match(literal)
verifyLen(r, 2, t)
verifyMember(r, a, t)
verifyMember(r, b, t)
}
func TestFullWildcard(t *testing.T) {
s := New()
literal := []byte("a.b.c")
fwc := []byte("a.>")
a, b := "a", "b"
s.Insert(literal, a)
s.Insert(fwc, b)
r := s.Match(literal)
verifyLen(r, 2, t)
verifyMember(r, a, t)
verifyMember(r, b, t)
}
func TestRemove(t *testing.T) {
s := New()
literal := []byte("a.b.c.d")
value := "foo"
s.Insert(literal, value)
verifyCount(s, 1, t)
s.Remove(literal, "bar")
verifyCount(s, 1, t)
s.Remove([]byte("a.b.c"), value)
verifyCount(s, 1, t)
s.Remove(literal, value)
verifyCount(s, 0, t)
r := s.Match(literal)
verifyLen(r, 0, t)
}
func TestRemoveWildcard(t *testing.T) {
s := New()
literal := []byte("a.b.c.d")
pwc := []byte("a.b.*.d")
fwc := []byte("a.b.>")
value := "foo"
s.Insert(pwc, value)
s.Insert(fwc, value)
s.Insert(literal, value)
verifyCount(s, 3, t)
r := s.Match(literal)
verifyLen(r, 3, t)
s.Remove(literal, value)
verifyCount(s, 2, t)
s.Remove(fwc, value)
verifyCount(s, 1, t)
s.Remove(pwc, value)
verifyCount(s, 0, t)
}
func TestRemoveCleanup(t *testing.T) {
s := New()
literal := []byte("a.b.c.d.e.f")
depth := len(bytes.Split(literal, []byte(".")))
value := "foo"
verifyNumLevels(s, 0, t)
s.Insert(literal, value)
verifyNumLevels(s, depth, t)
s.Remove(literal, value)
verifyNumLevels(s, 0, t)
}
func TestRemoveCleanupWildcards(t *testing.T) {
s := New()
literal := []byte("a.b.*.d.e.>")
depth := len(bytes.Split(literal, []byte(".")))
value := "foo"
verifyNumLevels(s, 0, t)
s.Insert(literal, value)
verifyNumLevels(s, depth, t)
s.Remove(literal, value)
verifyNumLevels(s, 0, t)
}
func TestInvalidSubjectsInsert(t *testing.T) {
s := New()
// Insert, or subscribtions, can have wildcards, but not empty tokens,
// and can not have a FWC that is not terminal
// beginning empty token
if err := s.Insert([]byte(".foo"), '@'); err != ErrInvalidSubject {
t.Fatal("Expected invalid subject error")
}
// trailing empty token
if err := s.Insert([]byte("foo."), '@'); err != ErrInvalidSubject {
t.Fatal("Expected invalid subject error")
}
// empty middle token
if err := s.Insert([]byte("foo..bar"), '@'); err != ErrInvalidSubject {
t.Fatal("Expected invalid subject error")
}
// empty middle token #2
if err := s.Insert([]byte("foo.bar..baz"), '@'); err != ErrInvalidSubject {
t.Fatal("Expected invalid subject error")
}
// fwc not terminal
if err := s.Insert([]byte("foo.>.bar"), '@'); err != ErrInvalidSubject {
t.Fatal("Expected invalid subject error")
}
}
func TestCacheBehavior(t *testing.T) {
s := New()
literal := []byte("a.b.c")
fwc := []byte("a.>")
a, b := "a", "b"
s.Insert(literal, a)
r := s.Match(literal)
verifyLen(r, 1, t)
s.Insert(fwc, b)
r = s.Match(literal)
verifyLen(r, 2, t)
verifyMember(r, a, t)
verifyMember(r, b, t)
s.Remove(fwc, b)
r = s.Match(literal)
verifyLen(r, 1, t)
verifyMember(r, a, t)
}
func checkBool(b, expected bool, t *testing.T) {
if b != expected {
debug.PrintStack()
t.Fatalf("Expected %v, but got %v\n", expected, b)
}
}
func TestValidLiteralSubjects(t *testing.T) {
checkBool(IsValidLiteralSubject([]byte("foo")), true, t)
checkBool(IsValidLiteralSubject([]byte(".foo")), false, t)
checkBool(IsValidLiteralSubject([]byte("foo.")), false, t)
checkBool(IsValidLiteralSubject([]byte("foo..bar")), false, t)
checkBool(IsValidLiteralSubject([]byte("foo.bar.*")), false, t)
checkBool(IsValidLiteralSubject([]byte("foo.bar.>")), false, t)
checkBool(IsValidLiteralSubject([]byte("*")), false, t)
checkBool(IsValidLiteralSubject([]byte(">")), false, t)
}
func TestMatchLiterals(t *testing.T) {
checkBool(matchLiteral([]byte("foo"), []byte("foo")), true, t)
checkBool(matchLiteral([]byte("foo"), []byte("bar")), false, t)
checkBool(matchLiteral([]byte("foo"), []byte("*")), true, t)
checkBool(matchLiteral([]byte("foo"), []byte(">")), true, t)
checkBool(matchLiteral([]byte("foo.bar"), []byte(">")), true, t)
checkBool(matchLiteral([]byte("foo.bar"), []byte("foo.>")), true, t)
checkBool(matchLiteral([]byte("foo.bar"), []byte("bar.>")), false, t)
checkBool(matchLiteral([]byte("stats.test.22"), []byte("stats.>")), true, t)
checkBool(matchLiteral([]byte("stats.test.22"), []byte("stats.*.*")), true, t)
checkBool(matchLiteral([]byte("foo.bar"), []byte("foo")), false, t)
checkBool(matchLiteral([]byte("stats.test.foos"), []byte("stats.test.foos")), true, t)
checkBool(matchLiteral([]byte("stats.test.foos"), []byte("stats.test.foo")), false, t)
}
func TestCacheBounds(t *testing.T) {
s := New()
s.Insert([]byte("cache.>"), "foo")
tmpl := "cache.test.%d"
loop := s.cmax + 100
for i := 0; i < loop; i++ {
sub := []byte(fmt.Sprintf(tmpl, i))
s.Match(sub)
}
cs := int(s.cache.Count())
if cs > s.cmax {
t.Fatalf("Cache is growing past limit: %d vs %d\n", cs, s.cmax)
}
}
func TestStats(t *testing.T) {
s := New()
s.Insert([]byte("stats.>"), "fwc")
tmpl := "stats.test.%d"
loop := 255
total := uint32(loop + 1)
for i := 0; i < loop; i++ {
sub := []byte(fmt.Sprintf(tmpl, i))
s.Insert(sub, "l")
}
stats := s.Stats()
if time.Since(stats.StatsTime) > 50*time.Millisecond {
t.Fatalf("StatsTime seems incorrect: %+v\n", stats.StatsTime)
}
if stats.NumSubs != total {
t.Fatalf("Wrong stats for NumSubs: %d vs %d\n", stats.NumSubs, total)
}
if stats.NumInserts != uint64(total) {
t.Fatalf("Wrong stats for NumInserts: %d vs %d\n", stats.NumInserts, total)
}
if stats.NumRemoves != 0 {
t.Fatalf("Wrong stats for NumRemoves: %d vs %d\n", stats.NumRemoves, 0)
}
if stats.NumMatches != 0 {
t.Fatalf("Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, 0)
}
for i := 0; i < loop; i++ {
s.Match([]byte("stats.test.22"))
}
s.Insert([]byte("stats.*.*"), "pwc")
s.Match([]byte("stats.test.22"))
stats = s.Stats()
if stats.NumMatches != uint64(loop+1) {
t.Fatalf("Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, loop+1)
}
expectedCacheHitRate := 255.0 / 256.0
if stats.CacheHitRate != expectedCacheHitRate {
t.Fatalf("Wrong stats for CacheHitRate: %.3g vs %0.3g\n", stats.CacheHitRate, expectedCacheHitRate)
}
if stats.MaxFanout != 3 {
t.Fatalf("Wrong stats for MaxFanout: %d vs %d\n", stats.MaxFanout, 3)
}
if stats.AvgFanout != 3.0 {
t.Fatalf("Wrong stats for AvgFanout: %g vs %g\n", stats.AvgFanout, 3.0)
}
s.ResetStats()
stats = s.Stats()
if time.Since(stats.StatsTime) > 50*time.Millisecond {
t.Fatalf("After Reset: StatsTime seems incorrect: %+v\n", stats.StatsTime)
}
if stats.NumInserts != 0 {
t.Fatalf("After Reset: Wrong stats for NumInserts: %d vs %d\n", stats.NumInserts, 0)
}
if stats.NumRemoves != 0 {
t.Fatalf("After Reset: Wrong stats for NumRemoves: %d vs %d\n", stats.NumRemoves, 0)
}
if stats.NumMatches != 0 {
t.Fatalf("After Reset: Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, 0)
}
if stats.CacheHitRate != 0.0 {
t.Fatalf("After Reset: Wrong stats for CacheHitRate: %.3g vs %0.3g\n", stats.CacheHitRate, 0.0)
}
}
func TestResultSetSnapshots(t *testing.T) {
// Make sure result sets do not change out from underneath of us.
literal := []byte("a.b.c.d.e.f")
wc := []byte("a.b.c.>")
value := "xxx"
s := New()
s.Insert(literal, value)
r := s.Match(literal)
verifyLen(r, 1, t)
s.Insert(wc, value)
s.Insert(wc, value)
verifyLen(r, 1, t)
s.Remove(wc, value)
verifyLen(r, 1, t)
}
func TestBadSubjectOnRemove(t *testing.T) {
bad := []byte("a.b..d")
value := "bad"
s := New()
if err := s.Insert(bad, value); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject, got %v\n", err)
}
if err := s.Remove(bad, value); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject, got %v\n", err)
}
badfwc := []byte("a.>.b")
if err := s.Remove(badfwc, value); err != ErrInvalidSubject {
t.Fatalf("Expected ErrInvalidSubject, got %v\n", err)
}
}
// This is from bug report #18
func TestTwoTokenPubMatchSingleTokenSub(t *testing.T) {
s := New()
val := "a"
sub := []byte("foo")
s.Insert(sub, val)
r := s.Match(sub)
verifyLen(r, 1, t)
verifyMember(r, val, t)
r = s.Match([]byte("foo.bar"))
verifyLen(r, 0, t)
}
// -- Benchmarks Setup --
var subs [][]byte
var toks = []string{"apcera", "continuum", "component", "router", "api", "imgr", "jmgr", "auth"}
var sl = New()
var results = make([]interface{}, 0, 64)
func init() {
subs = make([][]byte, 0, 256*1024)
subsInit("")
for i := 0; i < len(subs); i++ {
sl.Insert(subs[i], subs[i])
}
addWildcards()
}
func subsInit(pre string) {
var sub string
for _, t := range toks {
if len(pre) > 0 {
sub = pre + "." + t
} else {
sub = t
}
subs = append(subs, []byte(sub))
if len(strings.Split(sub, ".")) < 5 {
subsInit(sub)
}
}
}
func addWildcards() {
sl.Insert([]byte("cloud.>"), "paas")
sl.Insert([]byte("cloud.continuum.component.>"), "health")
sl.Insert([]byte("cloud.*.*.router.*"), "traffic")
}
// -- Benchmarks Setup End --
func Benchmark______________________Insert(b *testing.B) {
s := New()
for i, l := 0, len(subs); i < b.N; i++ {
index := i % l
s.Insert(subs[index], subs[index])
}
}
func Benchmark____________MatchSingleToken(b *testing.B) {
s := []byte("apcera")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func Benchmark______________MatchTwoTokens(b *testing.B) {
s := []byte("apcera.continuum")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func Benchmark____________MatchThreeTokens(b *testing.B) {
s := []byte("apcera.continuum.component")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func Benchmark_____________MatchFourTokens(b *testing.B) {
s := []byte("apcera.continuum.component.router")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func Benchmark_MatchFourTokensSingleResult(b *testing.B) {
s := []byte("apcera.continuum.component.router")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func Benchmark_MatchFourTokensMultiResults(b *testing.B) {
s := []byte("cloud.continuum.component.router")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func Benchmark_______MissOnLastTokenOfFive(b *testing.B) {
s := []byte("apcera.continuum.component.router.ZZZZ")
for i := 0; i < b.N; i++ {
sl.Match(s)
}
}
func multiRead(b *testing.B, num int) {
b.StopTimer()
var swg, fwg sync.WaitGroup
swg.Add(num)
fwg.Add(num)
s := []byte("apcera.continuum.component.router")
for i := 0; i < num; i++ {
go func() {
swg.Done()
swg.Wait()
for i := 0; i < b.N; i++ {
sl.Match(s)
}
fwg.Done()
}()
}
swg.Wait()
b.StartTimer()
fwg.Wait()
}
func Benchmark_____________10XMultipleReads(b *testing.B) {
multiRead(b, 10)
}
func Benchmark____________100XMultipleReads(b *testing.B) {
multiRead(b, 100)
}
func _BenchmarkRSS(b *testing.B) {
runtime.GC()
var m runtime.MemStats
runtime.ReadMemStats(&m)
println("HEAP:", m.HeapObjects)
println("ALLOC:", m.Alloc)
println("TOTAL ALLOC:", m.TotalAlloc)
time.Sleep(30 * 1e9)
}