From df02bc0bcf90717889aa9ab9189162fb43d6dc3a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 2 Apr 2016 12:13:28 -0700 Subject: [PATCH] Removed sublist, hash and hashmap, no longer needed. --- .travis.yml | 1 + TODO.md | 6 +- conf/parse.go | 3 +- hash/hash.go | 204 --------------- hash/hash_test.go | 201 --------------- hash/results.txt | 115 --------- hashmap/hashmap.go | 257 ------------------- hashmap/hashmap_test.go | 348 ------------------------- hashmap/rand_evict.go | 43 ---- hashmap/results.txt | 52 ---- scripts/cov.sh | 3 - server/client.go | 51 ++-- server/client_test.go | 12 +- server/monitor.go | 27 +- server/route.go | 17 +- server/server.go | 12 +- server/split_test.go | 35 ++- server/sublist.go | 532 ++++++++++++++++++++++++++++++++++++++ server/sublist_test.go | 457 +++++++++++++++++++++++++++++++++ sublist/sublist.go | 555 ---------------------------------------- sublist/sublist_test.go | 529 -------------------------------------- 21 files changed, 1071 insertions(+), 2389 deletions(-) delete mode 100644 hash/hash.go delete mode 100644 hash/hash_test.go delete mode 100644 hash/results.txt delete mode 100644 hashmap/hashmap.go delete mode 100644 hashmap/hashmap_test.go delete mode 100644 hashmap/rand_evict.go delete mode 100644 hashmap/results.txt create mode 100644 server/sublist.go create mode 100644 server/sublist_test.go delete mode 100644 sublist/sublist.go delete mode 100644 sublist/sublist_test.go diff --git a/.travis.yml b/.travis.yml index b18ca58e..3978e29d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,6 @@ language: go go: +- 1.6 - 1.5 install: - go get github.com/nats-io/nats diff --git a/TODO.md b/TODO.md index 50422f4c..b4b537be 100644 --- a/TODO.md +++ b/TODO.md @@ -1,6 +1,8 @@ # General +- [ ] Multiple listen endpoints +- [ ] Listen configure key vs addr and port - [ ] Multiple Auth - [ ] Authorization / Access - [ ] T series reservations @@ -8,7 +10,6 @@ - [ ] No downtime restart - [ ] Signal based reload of configuration - [ ] Dynamic socket buffer sizes -- [ ] Switch to 1.4/1.5 and use maps vs hashmaps in sublist - [ ] brew, apt-get, rpm, chocately (windows) - [ ] Sublist better at high concurrency, cache uses writelock always currently - [ ] Buffer pools/sync pools? @@ -19,8 +20,9 @@ - [ ] Memory limits/warnings? - [ ] Limit number of subscriptions a client can have, total memory usage etc. - [ ] Info updates contain other implicit route servers -- [ ] Multi-tenant accounts with isolation of subject space - [ ] Pedantic state +- [ ] Multi-tenant accounts with isolation of subject space +- [X] Switch to 1.4/1.5 and use maps vs hashmaps in sublist - [X] NewSource on Rand to lower lock contention on QueueSubs, or redesign! - [X] Default sort by cid on connz - [X] Track last activity time per connection? diff --git a/conf/parse.go b/conf/parse.go index 2d2a4fd6..19d35636 100644 --- a/conf/parse.go +++ b/conf/parse.go @@ -1,4 +1,4 @@ -// Copyright 2013 Apcera Inc. All rights reserved. +// Copyright 2013-2016 Apcera Inc. All rights reserved. // Package conf supports a configuration file format used by gnatsd. It is // a flexible format that combines the best of traditional @@ -46,7 +46,6 @@ func Parse(data string) (map[string]interface{}, error) { } func parse(data string) (p *parser, err error) { - p = &parser{ mapping: make(map[string]interface{}), lx: lex(data), diff --git a/hash/hash.go b/hash/hash.go deleted file mode 100644 index 1eb39563..00000000 --- a/hash/hash.go +++ /dev/null @@ -1,204 +0,0 @@ -// Copyright 2012 Apcera Inc. All rights reserved. - -// Package hash is a collection of high performance 32-bit hash functions. -package hash - -import ( - "unsafe" -) - -// Bernstein Hash. -func Bernstein(data []byte) uint32 { - hash := uint32(5381) - for _, b := range data { - hash = ((hash << 5) + hash) + uint32(b) - } - return hash -} - -// Constants for FNV1A and derivatives -const ( - _OFF32 = 2166136261 - _P32 = 16777619 - _YP32 = 709607 -) - -// FNV1A Hash [http://en.wikipedia.org/wiki/Fowler-Noll-Vo_hash_function] -func FNV1A(data []byte) uint32 { - var hash uint32 = _OFF32 - for _, c := range data { - hash ^= uint32(c) - hash *= _P32 - } - return hash -} - -// Constants for multiples of sizeof(WORD) -const ( - _WSZ = 4 // 4 - _DWSZ = _WSZ << 1 // 8 - _DDWSZ = _WSZ << 2 // 16 - _DDDWSZ = _WSZ << 3 // 32 -) - -// Jesteress derivative of FNV1A from [http://www.sanmayce.com/Fastest_Hash/] -func Jesteress(data []byte) uint32 { - h32 := uint32(_OFF32) - i, dlen := 0, len(data) - - for ; dlen >= _DDWSZ; dlen -= _DDWSZ { - k1 := *(*uint64)(unsafe.Pointer(&data[i])) - k2 := *(*uint64)(unsafe.Pointer(&data[i+4])) - h32 = uint32((uint64(h32) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32) - i += _DDWSZ - } - - // Cases: 0,1,2,3,4,5,6,7 - if (dlen & _DWSZ) > 0 { - k1 := *(*uint64)(unsafe.Pointer(&data[i])) - h32 = uint32(uint64(h32)^k1) * _YP32 - i += _DWSZ - } - if (dlen & _WSZ) > 0 { - k1 := *(*uint32)(unsafe.Pointer(&data[i])) - h32 = (h32 ^ k1) * _YP32 - i += _WSZ - } - if (dlen & 1) > 0 { - h32 = (h32 ^ uint32(data[i])) * _YP32 - } - return h32 ^ (h32 >> 16) -} - -// Meiyan derivative of FNV1A from [http://www.sanmayce.com/Fastest_Hash/] -func Meiyan(data []byte) uint32 { - h32 := uint32(_OFF32) - i, dlen := 0, len(data) - - for ; dlen >= _DDWSZ; dlen -= _DDWSZ { - k1 := *(*uint64)(unsafe.Pointer(&data[i])) - k2 := *(*uint64)(unsafe.Pointer(&data[i+4])) - h32 = uint32((uint64(h32) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32) - i += _DDWSZ - } - - // Cases: 0,1,2,3,4,5,6,7 - if (dlen & _DWSZ) > 0 { - k1 := *(*uint64)(unsafe.Pointer(&data[i])) - h32 = uint32(uint64(h32)^k1) * _YP32 - i += _WSZ - k1 = *(*uint64)(unsafe.Pointer(&data[i])) - h32 = uint32(uint64(h32)^k1) * _YP32 - i += _WSZ - } - if (dlen & _WSZ) > 0 { - k1 := *(*uint32)(unsafe.Pointer(&data[i])) - h32 = (h32 ^ k1) * _YP32 - i += _WSZ - } - if (dlen & 1) > 0 { - h32 = (h32 ^ uint32(data[i])) * _YP32 - } - return h32 ^ (h32 >> 16) -} - -// Yorikke derivative of FNV1A from [http://www.sanmayce.com/Fastest_Hash/] -func Yorikke(data []byte) uint32 { - h32 := uint32(_OFF32) - h32b := uint32(_OFF32) - i, dlen := 0, len(data) - - for ; dlen >= _DDDWSZ; dlen -= _DDDWSZ { - k1 := *(*uint64)(unsafe.Pointer(&data[i])) - k2 := *(*uint64)(unsafe.Pointer(&data[i+4])) - h32 = uint32((uint64(h32) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32) - k1 = *(*uint64)(unsafe.Pointer(&data[i+8])) - k2 = *(*uint64)(unsafe.Pointer(&data[i+12])) - h32b = uint32((uint64(h32b) ^ ((k1<<5 | k1>>27) ^ k2)) * _YP32) - i += _DDDWSZ - } - if (dlen & _DDWSZ) > 0 { - k1 := *(*uint64)(unsafe.Pointer(&data[i])) - k2 := *(*uint64)(unsafe.Pointer(&data[i+4])) - h32 = uint32((uint64(h32) ^ k1) * _YP32) - h32b = uint32((uint64(h32b) ^ k2) * _YP32) - i += _DDWSZ - } - // Cases: 0,1,2,3,4,5,6,7 - if (dlen & _DWSZ) > 0 { - k1 := *(*uint32)(unsafe.Pointer(&data[i])) - k2 := *(*uint32)(unsafe.Pointer(&data[i+2])) - h32 = (h32 ^ k1) * _YP32 - h32b = (h32b ^ k2) * _YP32 - i += _DWSZ - } - if (dlen & _WSZ) > 0 { - k1 := *(*uint32)(unsafe.Pointer(&data[i])) - h32 = (h32 ^ k1) * _YP32 - i += _WSZ - } - if (dlen & 1) > 0 { - h32 = (h32 ^ uint32(data[i])) * _YP32 - } - h32 = (h32 ^ (h32b<<5 | h32b>>27)) * _YP32 - return h32 ^ (h32 >> 16) -} - -// Constants defined by the Murmur3 algorithm -const ( - _C1 = uint32(0xcc9e2d51) - _C2 = uint32(0x1b873593) - _F1 = uint32(0x85ebca6b) - _F2 = uint32(0xc2b2ae35) -) - -// M3Seed is a default seed for Murmur3 -const M3Seed = uint32(0x9747b28c) - -// Murmur3 Hash [http://code.google.com/p/smhasher/wiki/MurmurHash3] -// Does not generate intermediate objects. -func Murmur3(data []byte, seed uint32) uint32 { - h1 := seed - ldata := len(data) - end := ldata - (ldata % 4) - i := 0 - - // Inner - for ; i < end; i += 4 { - k1 := *(*uint32)(unsafe.Pointer(&data[i])) - k1 *= _C1 - k1 = (k1 << 15) | (k1 >> 17) - k1 *= _C2 - - h1 ^= k1 - h1 = (h1 << 13) | (h1 >> 19) - h1 = h1*5 + 0xe6546b64 - } - - // Tail - var k1 uint32 - switch ldata - i { - case 3: - k1 |= uint32(data[i+2]) << 16 - fallthrough - case 2: - k1 |= uint32(data[i+1]) << 8 - fallthrough - case 1: - k1 |= uint32(data[i]) - k1 *= _C1 - k1 = (k1 << 15) | (k1 >> 17) - k1 *= _C2 - h1 ^= k1 - } - - // Finalization - h1 ^= uint32(ldata) - h1 ^= (h1 >> 16) - h1 *= _F1 - h1 ^= (h1 >> 13) - h1 *= _F2 - h1 ^= (h1 >> 16) - - return h1 -} diff --git a/hash/hash_test.go b/hash/hash_test.go deleted file mode 100644 index 4e0a6237..00000000 --- a/hash/hash_test.go +++ /dev/null @@ -1,201 +0,0 @@ -package hash - -import ( - "math/rand" - "testing" -) - -var keys = [][]byte{ - []byte("foo"), - []byte("bar"), - []byte("apcera.continuum.router.foo.bar"), - []byte("apcera.continuum.router.foo.bar.baz"), -} - -func TestBernstein(t *testing.T) { - results := []uint32{193491849, 193487034, 2487287557, 3139297488} - for i, key := range keys { - h := Bernstein(key) - if h != results[i] { - t.Fatalf("hash is incorrect, expected %d, got %d\n", - results[i], h) - } - } -} - -func TestFNV1A(t *testing.T) { - results := []uint32{2851307223, 1991736602, 1990810707, 1244015104} - for i, key := range keys { - h := FNV1A(key) - if h != results[i] { - t.Fatalf("hash is incorrect, expected %d, got %d\n", - results[i], h) - } - } -} - -func TestJesteress(t *testing.T) { - results := []uint32{1058908168, 1061739001, 4242539713, 3332038527} - for i, key := range keys { - h := Jesteress(key) - if h != results[i] { - t.Fatalf("hash is incorrect, expected %d, got %d\n", - results[i], h) - } - } -} - -func TestMeiyan(t *testing.T) { - results := []uint32{1058908168, 1061739001, 2891236487, 3332038527} - for i, key := range keys { - h := Meiyan(key) - if h != results[i] { - t.Fatalf("hash is incorrect, expected %d, got %d\n", - results[i], h) - } - } -} - -func TestYorikke(t *testing.T) { - results := []uint32{3523423968, 2222334353, 407908456, 359111667} - for i, key := range keys { - h := Yorikke(key) - if h != results[i] { - t.Fatalf("hash is incorrect, expected %d, got %d\n", - results[i], h) - } - } -} - -func TestMurmur3(t *testing.T) { - results := []uint32{659908353, 522989004, 135963738, 990328005} - for i, key := range keys { - h := Murmur3(key, M3Seed) - if h != results[i] { - t.Fatalf("hash is incorrect, expected %d, got %d\n", - results[i], h) - } - } -} - -var ch = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@$#%^&*()") - -func sizedBytes(sz int) []byte { - b := make([]byte, sz) - for i := range b { - b[i] = ch[rand.Intn(len(ch))] - } - return b -} - -var smlKey = sizedBytes(8) -var medKey = sizedBytes(32) -var lrgKey = sizedBytes(256) - -func Benchmark_Bernstein_SmallKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Bernstein(smlKey) - } -} - -func Benchmark_Murmur3___SmallKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Murmur3(smlKey, M3Seed) - } -} - -func Benchmark_FNV1A_____SmallKey(b *testing.B) { - for i := 0; i < b.N; i++ { - FNV1A(smlKey) - } -} - -func Benchmark_Meiyan____SmallKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Meiyan(smlKey) - } -} - -func Benchmark_Jesteress_SmallKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Jesteress(smlKey) - } -} - -func Benchmark_Yorikke___SmallKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Yorikke(smlKey) - } -} - -func Benchmark_Bernstein___MedKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Bernstein(medKey) - } -} - -func Benchmark_Murmur3_____MedKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Murmur3(medKey, M3Seed) - } -} - -func Benchmark_FNV1A_______MedKey(b *testing.B) { - for i := 0; i < b.N; i++ { - FNV1A(medKey) - } -} - -func Benchmark_Meiyan______MedKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Meiyan(medKey) - } -} - -func Benchmark_Jesteress___MedKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Jesteress(medKey) - } -} - -func Benchmark_Yorikke_____MedKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Yorikke(medKey) - } -} - -func Benchmark_Bernstein___LrgKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Bernstein(lrgKey) - } -} - -func Benchmark_Murmur3_____LrgKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Murmur3(lrgKey, M3Seed) - } -} - -func Benchmark_FNV1A_______LrgKey(b *testing.B) { - for i := 0; i < b.N; i++ { - FNV1A(lrgKey) - } -} - -func Benchmark_Meiyan______LrgKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Meiyan(lrgKey) - } -} - -func Benchmark_Jesteress___LrgKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Jesteress(lrgKey) - } -} - -func Benchmark_Yorikke_____LrgKey(b *testing.B) { - for i := 0; i < b.N; i++ { - Yorikke(lrgKey) - } -} diff --git a/hash/results.txt b/hash/results.txt deleted file mode 100644 index e0c26352..00000000 --- a/hash/results.txt +++ /dev/null @@ -1,115 +0,0 @@ -NOTE: I used SetBytes(1) to give quick estimate of ops/sec - -2013 MacbookAir 11" i7 1.7Ghz Haswell - -================ -OSX - Mavericks 10.9.2 -Go version go1.2.1 -================ - -Benchmark_Bernstein_SmallKey 500000000 5.13 ns/op 195.10 MB/s -Benchmark_Murmur3___SmallKey 200000000 8.11 ns/op 123.26 MB/s -Benchmark_FNV1A_____SmallKey 500000000 5.07 ns/op 197.36 MB/s -Benchmark_Meiyan____SmallKey 500000000 4.24 ns/op 236.02 MB/s -Benchmark_Jesteress_SmallKey 500000000 5.32 ns/op 188.08 MB/s -Benchmark_Yorikke___SmallKey 500000000 5.52 ns/op 181.20 MB/s -Benchmark_Bernstein___MedKey 50000000 34.9 ns/op 28.65 MB/s -Benchmark_Murmur3_____MedKey 100000000 17.9 ns/op 55.94 MB/s -Benchmark_FNV1A_______MedKey 50000000 31.9 ns/op 31.37 MB/s -Benchmark_Meiyan______MedKey 200000000 9.28 ns/op 107.76 MB/s -Benchmark_Jesteress___MedKey 200000000 8.15 ns/op 122.65 MB/s -Benchmark_Yorikke_____MedKey 200000000 9.19 ns/op 108.83 MB/s - - - -2012 MacbookAir 11" i7 2Ghz - -================ -OSX - Mountain Lion -Go version go1.1 -================ - -Benchmark_Bernstein_SmallKey 500000000 5.19 ns/op 192.58 MB/s -Benchmark_Murmur3___SmallKey 200000000 7.98 ns/op 125.31 MB/s -Benchmark_FNV1A_____SmallKey 500000000 4.89 ns/op 204.30 MB/s -Benchmark_Meiyan____SmallKey 500000000 5.13 ns/op 195.10 MB/s -Benchmark_Jesteress_SmallKey 500000000 5.11 ns/op 195.52 MB/s -Benchmark_Yorikke___SmallKey 500000000 5.43 ns/op 184.01 MB/s -Benchmark_Bernstein___MedKey 50000000 39.10 ns/op 25.60 MB/s -Benchmark_Murmur3_____MedKey 100000000 18.70 ns/op 53.45 MB/s -Benchmark_FNV1A_______MedKey 50000000 32.10 ns/op 31.16 MB/s -Benchmark_Meiyan______MedKey 500000000 7.79 ns/op 128.33 MB/s -Benchmark_Jesteress___MedKey 500000000 7.05 ns/op 141.81 MB/s -Benchmark_Yorikke_____MedKey 500000000 7.51 ns/op 133.19 MB/s - -================ -OSX - Mountain Lion -Go version go1.0.3 -================ - -go test --bench="." -gcflags="-B" - -Benchmark_Bernstein_SmallKey 200000000 9.17 ns/op 109.03 MB/s -Benchmark_Murmur3___SmallKey 200000000 9.68 ns/op 103.27 MB/s -Benchmark_FNV1A_____SmallKey 200000000 9.21 ns/op 108.58 MB/s -Benchmark_Meiyan____SmallKey 500000000 6.34 ns/op 157.82 MB/s -Benchmark_Jesteress_SmallKey 500000000 6.37 ns/op 157.01 MB/s -Benchmark_Yorikke___SmallKey 500000000 6.98 ns/op 143.34 MB/s -Benchmark_Bernstein___MedKey 50000000 53.4 ns/op 18.71 MB/s -Benchmark_Murmur3_____MedKey 100000000 29.8 ns/op 33.56 MB/s -Benchmark_FNV1A_______MedKey 50000000 52.6 ns/op 19.02 MB/s -Benchmark_Meiyan______MedKey 200000000 8.84 ns/op 113.14 MB/s -Benchmark_Jesteress___MedKey 200000000 7.94 ns/op 125.90 MB/s -Benchmark_Yorikke_____MedKey 200000000 9.48 ns/op 105.50 MB/s - - -================ -Ubunutu 12.10 -Go version go1.0.3 -gcc version 4.7.2 (Ubuntu/Linaro 4.7.2-4precise1) -================ - -go test --bench="." -gcflags="-B" - -Benchmark_Bernstein_SmallKey 200000000 9.90 ns/op 101.06 MB/s -Benchmark_Murmur3___SmallKey 100000000 10.1 ns/op 98.96 MB/s -Benchmark_FNV1A_____SmallKey 200000000 9.29 ns/op 107.59 MB/s -Benchmark_Meiyan____SmallKey 500000000 6.15 ns/op 162.50 MB/s -Benchmark_Jesteress_SmallKey 500000000 6.78 ns/op 147.58 MB/s -Benchmark_Yorikke___SmallKey 500000000 7.17 ns/op 139.49 MB/s -Benchmark_Bernstein___MedKey 50000000 55.0 ns/op 18.18 MB/s -Benchmark_Murmur3_____MedKey 50000000 30.2 ns/op 33.13 MB/s -Benchmark_FNV1A_______MedKey 50000000 56.0 ns/op 17.86 MB/s -Benchmark_Meiyan______MedKey 200000000 9.14 ns/op 109.43 MB/s -Benchmark_Jesteress___MedKey 200000000 8.25 ns/op 121.24 MB/s -Benchmark_Yorikke_____MedKey 200000000 9.72 ns/op 102.91 MB/s - -go test --bench="." -compiler gccgo -gccgoflags="-O2" -gcflags="-B" - -Benchmark_Bernstein_SmallKey 500000000 4.70 ns/op 212.97 MB/s -Benchmark_Murmur3___SmallKey 200000000 8.18 ns/op 122.21 MB/s -Benchmark_FNV1A_____SmallKey 500000000 5.18 ns/op 193.17 MB/s -Benchmark_Meiyan____SmallKey 500000000 6.21 ns/op 161.13 MB/s -Benchmark_Jesteress_SmallKey 500000000 5.51 ns/op 181.38 MB/s -Benchmark_Yorikke___SmallKey 500000000 7.13 ns/op 140.19 MB/s -Benchmark_Bernstein___MedKey 100000000 27.8 ns/op 35.98 MB/s -Benchmark_Murmur3_____MedKey 100000000 19.1 ns/op 52.46 MB/s -Benchmark_FNV1A_______MedKey 50000000 34.7 ns/op 28.79 MB/s -Benchmark_Meiyan______MedKey 500000000 7.24 ns/op 138.10 MB/s -Benchmark_Jesteress___MedKey 500000000 6.58 ns/op 151.97 MB/s -Benchmark_Yorikke_____MedKey 200000000 9.08 ns/op 110.17 MB/s - -go test --bench="." -compiler gccgo -gccgoflags="-O3" -gcflags="-B" - -Benchmark_Bernstein_SmallKey 2000000000 1.80 ns/op 555.22 MB/s -Benchmark_Murmur3___SmallKey 200000000 8.07 ns/op 123.86 MB/s -Benchmark_FNV1A_____SmallKey 2000000000 1.89 ns/op 528.93 MB/s -Benchmark_Meiyan____SmallKey 500000000 6.20 ns/op 161.36 MB/s -Benchmark_Jesteress_SmallKey 500000000 5.47 ns/op 182.76 MB/s -Benchmark_Yorikke___SmallKey 500000000 7.11 ns/op 140.58 MB/s -Benchmark_Bernstein___MedKey 100000000 20.3 ns/op 49.18 MB/s -Benchmark_Murmur3_____MedKey 100000000 19.0 ns/op 52.58 MB/s -Benchmark_FNV1A_______MedKey 100000000 20.3 ns/op 49.35 MB/s -Benchmark_Meiyan______MedKey 500000000 6.99 ns/op 143.00 MB/s -Benchmark_Jesteress___MedKey 500000000 6.44 ns/op 155.23 MB/s -Benchmark_Yorikke_____MedKey 200000000 9.01 ns/op 110.98 MB/s diff --git a/hashmap/hashmap.go b/hashmap/hashmap.go deleted file mode 100644 index 0edc6aec..00000000 --- a/hashmap/hashmap.go +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2012-2015 Apcera Inc. All rights reserved. - -// Package hashmap defines a high performance hashmap based on -// fast hashing and fast key comparison. Simple chaining -// is used, relying on the hashing algorithms for good -// distribution -package hashmap - -import ( - "bytes" - "errors" - "unsafe" - - "github.com/nats-io/gnatsd/hash" -) - -// HashMap stores Entry items using a given Hash function. -// The Hash function can be overridden. -type HashMap struct { - Hash func([]byte) uint32 - bkts []*Entry - msk uint32 - used uint32 - rsz bool -} - -// Entry represents what the map is actually storing. -// Uses simple linked list resolution for collisions. -type Entry struct { - hk uint32 - key []byte - data interface{} - next *Entry -} - -// BucketSize, must be power of 2 -const _BSZ = 8 - -// Constants for multiples of sizeof(WORD) -const ( - _WSZ = 4 // 4 - _DWSZ = _WSZ << 1 // 8 -) - -// DefaultHash to be used unless overridden. -var DefaultHash = hash.Jesteress - -// Stats are reported on HashMaps -type Stats struct { - NumElements uint32 - NumSlots uint32 - NumBuckets uint32 - LongChain uint32 - AvgChain float32 -} - -// NewWithBkts creates a new HashMap using the bkts slice argument. -// len(bkts) must be a power of 2. -func NewWithBkts(bkts []*Entry) (*HashMap, error) { - l := len(bkts) - if l == 0 || (l&(l-1) != 0) { - return nil, errors.New("Size of buckets must be power of 2") - } - h := HashMap{} - h.msk = uint32(l - 1) - h.bkts = bkts - h.Hash = DefaultHash - h.rsz = true - return &h, nil -} - -// New creates a new HashMap of default size and using the default -// Hashing algorithm. -func New() *HashMap { - h, _ := NewWithBkts(make([]*Entry, _BSZ)) - return h -} - -// Set will set the key item to data. This will blindly replace any item -// that may have been at key previous. -func (h *HashMap) Set(key []byte, data interface{}) { - hk := h.Hash(key) - e := h.bkts[hk&h.msk] - for e != nil { - if len(key) == len(e.key) && bytes.Equal(key, e.key) { - // Success, replace data field - e.data = data - return - } - e = e.next - } - // We have a new entry here - ne := &Entry{hk: hk, key: key, data: data} - ne.next = h.bkts[hk&h.msk] - h.bkts[hk&h.msk] = ne - h.used++ - // Check for resizing - if h.rsz && (h.used > uint32(len(h.bkts))) { - h.grow() - } -} - -// Get will return the item at key. -func (h *HashMap) Get(key []byte) interface{} { - hk := h.Hash(key) - e := h.bkts[hk&h.msk] - // FIXME: Reorder on GET if chained? - // We unroll and optimize the comparison of keys. - for e != nil { - i, klen := 0, len(key) - if klen != len(e.key) { - goto next - } - // We unroll and optimize the key comparison here. - // Compare _DWSZ at a time - for ; klen >= _DWSZ; klen -= _DWSZ { - k1 := *(*uint64)(unsafe.Pointer(&key[i])) - k2 := *(*uint64)(unsafe.Pointer(&e.key[i])) - if k1 != k2 { - goto next - } - i += _DWSZ - } - // Check by _WSZ if applicable - if (klen & _WSZ) > 0 { - k1 := *(*uint32)(unsafe.Pointer(&key[i])) - k2 := *(*uint32)(unsafe.Pointer(&e.key[i])) - if k1 != k2 { - goto next - } - i += _WSZ - } - // Compare what is left over, byte by byte - for ; i < len(key); i++ { - if key[i] != e.key[i] { - goto next - } - } - // Success - return e.data - next: - e = e.next - } - return nil -} - -// Remove will remove what is associated with key. -func (h *HashMap) Remove(key []byte) { - hk := h.Hash(key) - e := &h.bkts[hk&h.msk] - for *e != nil { - if len(key) == len((*e).key) && bytes.Equal(key, (*e).key) { - // Success - *e = (*e).next - h.used-- - // Check for resizing - lbkts := uint32(len(h.bkts)) - if h.rsz && lbkts > _BSZ && (h.used < lbkts/4) { - h.shrink() - } - return - } - e = &(*e).next - } -} - -// resize is responsible for reallocating the buckets and -// redistributing the hashmap entries. -func (h *HashMap) resize(nsz uint32) { - nmsk := nsz - 1 - bkts := make([]*Entry, nsz) - ents := make([]Entry, h.used) - var ne *Entry - var i int - for _, e := range h.bkts { - for ; e != nil; e = e.next { - ne, i = &ents[i], i+1 - *ne = *e - ne.next = bkts[e.hk&nmsk] - bkts[e.hk&nmsk] = ne - } - } - h.bkts = bkts - h.msk = nmsk -} - -const maxBktSize = (1 << 31) - 1 - -// grow the HashMap's buckets by 2 -func (h *HashMap) grow() { - // Can't grow beyond maxint for now - if len(h.bkts) >= maxBktSize { - return - } - h.resize(uint32(2 * len(h.bkts))) -} - -// shrink the HashMap's buckets by 2 -func (h *HashMap) shrink() { - if len(h.bkts) <= _BSZ { - return - } - h.resize(uint32(len(h.bkts) / 2)) -} - -// Count returns number of elements in the HashMap -func (h *HashMap) Count() uint32 { - return h.used -} - -// AllKeys will return all the keys stored in the HashMap -func (h *HashMap) AllKeys() [][]byte { - all := make([][]byte, 0, h.used) - for _, e := range h.bkts { - for ; e != nil; e = e.next { - all = append(all, e.key) - } - } - return all -} - -// All returns all the Entries in the map -func (h *HashMap) All() []interface{} { - all := make([]interface{}, 0, h.used) - for _, e := range h.bkts { - for ; e != nil; e = e.next { - all = append(all, e.data) - } - } - return all -} - -// Stats will collect general statistics about the HashMap -func (h *HashMap) Stats() *Stats { - lc, totalc, slots := 0, 0, 0 - for _, e := range h.bkts { - if e != nil { - slots++ - } - i := 0 - for ; e != nil; e = e.next { - i++ - if i > lc { - lc = i - } - } - totalc += i - } - l := uint32(len(h.bkts)) - avg := (float32(totalc) / float32(slots)) - return &Stats{ - NumElements: h.used, - NumBuckets: l, - LongChain: uint32(lc), - AvgChain: avg, - NumSlots: uint32(slots)} -} diff --git a/hashmap/hashmap_test.go b/hashmap/hashmap_test.go deleted file mode 100644 index b8b431d2..00000000 --- a/hashmap/hashmap_test.go +++ /dev/null @@ -1,348 +0,0 @@ -package hashmap - -import ( - "bytes" - "crypto/rand" - "encoding/hex" - "io" - "testing" -) - -func TestMapWithBkts(t *testing.T) { - bkts := make([]*Entry, 3, 3) - _, err := NewWithBkts(bkts) - if err == nil { - t.Fatalf("Buckets size of %d should have failed\n", len(bkts)) - } - bkts = make([]*Entry, 8, 8) - _, err = NewWithBkts(bkts) - if err != nil { - t.Fatalf("Buckets size of %d should have succeeded\n", len(bkts)) - } -} - -var foo = []byte("foo") -var bar = []byte("bar") -var baz = []byte("baz") -var med = []byte("foo.bar.baz") -var sub = []byte("apcera.continuum.router.foo.bar.baz") - -func TestHashMapBasics(t *testing.T) { - h := New() - - if h.used != 0 { - t.Fatalf("Wrong number of entries: %d vs 0\n", h.used) - } - h.Set(foo, bar) - if h.used != 1 { - t.Fatalf("Wrong number of entries: %d vs 1\n", h.used) - } - if v := h.Get(foo).([]byte); !bytes.Equal(v, bar) { - t.Fatalf("Did not receive correct answer: '%s' vs '%s'\n", bar, v) - } - h.Remove(foo) - if h.used != 0 { - t.Fatalf("Wrong number of entries: %d vs 0\n", h.used) - } - if v := h.Get(foo); v != nil { - t.Fatal("Did not receive correct answer, should be nil") - } -} - -const ( - INS = 100 - EXP = 128 - REM = 75 - EXP2 = 64 -) - -func TestGrowing(t *testing.T) { - h := New() - - if len(h.bkts) != _BSZ { - t.Fatalf("Initial bucket size is wrong: %d vs %d\n", len(h.bkts), _BSZ) - } - // Create _INBOX style end tokens - var toks [INS][]byte - for i := range toks { - u := make([]byte, 13) - io.ReadFull(rand.Reader, u) - toks[i] = []byte(hex.EncodeToString(u)) - h.Set(toks[i], toks[i]) - tg := h.Get(toks[i]).([]byte) - if !bytes.Equal(tg, toks[i]) { - t.Fatalf("Did not match properly, '%s' vs '%s'\n", tg, toks[i]) - } - } - if len(h.bkts) != EXP { - t.Fatalf("Expanded bucket size is wrong: %d vs %d\n", len(h.bkts), EXP) - } -} - -func TestHashMapCollisions(t *testing.T) { - h := New() - h.rsz = false - - // Create _INBOX style end tokens - var toks [INS][]byte - for i := range toks { - u := make([]byte, 13) - io.ReadFull(rand.Reader, u) - toks[i] = []byte(hex.EncodeToString(u)) - h.Set(toks[i], toks[i]) - tg := h.Get(toks[i]).([]byte) - if !bytes.Equal(tg, toks[i]) { - t.Fatalf("Did not match properly, '%s' vs '%s'\n", tg, toks[i]) - } - } - if len(h.bkts) != _BSZ { - t.Fatalf("Bucket size is wrong: %d vs %d\n", len(h.bkts), _BSZ) - } - h.grow() - if len(h.bkts) != 2*_BSZ { - t.Fatalf("Bucket size is wrong: %d vs %d\n", len(h.bkts), 2*_BSZ) - } - ti := 32 - tg := h.Get(toks[ti]).([]byte) - if !bytes.Equal(tg, toks[ti]) { - t.Fatalf("Did not match properly, '%s' vs '%s'\n", tg, toks[ti]) - } - - h.Remove(toks[99]) - rg := h.Get(toks[99]) - if rg != nil { - t.Fatalf("After remove should have been nil! '%s'\n", rg.([]byte)) - } -} - -func TestAll(t *testing.T) { - h := New() - h.Set([]byte("1"), 1) - h.Set([]byte("2"), 1) - h.Set([]byte("3"), 1) - all := h.All() - if len(all) != 3 { - t.Fatalf("Expected All() to return 3, but got %d\n", len(all)) - } - allkeys := h.AllKeys() - if len(allkeys) != 3 { - t.Fatalf("Expected All() to return 3, but got %d\n", len(allkeys)) - } -} - -func TestSetDoesReplaceOnExisting(t *testing.T) { - h := New() - k := []byte("key") - h.Set(k, "foo") - h.Set(k, "bar") - all := h.All() - if len(all) != 1 { - t.Fatalf("Set should replace, expected 1 vs %d\n", len(all)) - } - s, ok := all[0].(string) - if !ok { - t.Fatalf("Value is incorrect: %v\n", all[0].(string)) - } - if s != "bar" { - t.Fatalf("Value is incorrect, expected 'bar' vs '%s'\n", s) - } -} - -func TestCollision(t *testing.T) { - h := New() - k1 := []byte("999") - k2 := []byte("1000") - h.Set(k1, "foo") - h.Set(k2, "bar") - all := h.All() - if len(all) != 2 { - t.Fatalf("Expected 2 vs %d\n", len(all)) - } - if h.Get(k1) == nil { - t.Fatalf("Failed to get '999'\n") - } -} - -func TestHashMapStats(t *testing.T) { - h := New() - h.rsz = false - - // Create _INBOX style end tokens - var toks [INS][]byte - for i := range toks { - u := make([]byte, 13) - io.ReadFull(rand.Reader, u) - toks[i] = []byte(hex.EncodeToString(u)) - h.Set(toks[i], toks[i]) - tg := h.Get(toks[i]).([]byte) - if !bytes.Equal(tg, toks[i]) { - t.Fatalf("Did not match properly, '%s' vs '%s'\n", tg, toks[i]) - } - } - - s := h.Stats() - if s.NumElements != INS { - t.Fatalf("NumElements incorrect: %d vs %d\n", s.NumElements, INS) - } - if s.NumBuckets != _BSZ { - t.Fatalf("NumBuckets incorrect: %d vs %d\n", s.NumBuckets, _BSZ) - } - if s.AvgChain > 13 || s.AvgChain < 12 { - t.Fatalf("AvgChain out of bounds: %f vs %f\n", s.AvgChain, 12.5) - } - if s.LongChain > 25 { - t.Fatalf("LongChain out of bounds: %d vs %d\n", s.LongChain, 22) - } -} - -func TestShrink(t *testing.T) { - h := New() - - if len(h.bkts) != _BSZ { - t.Fatalf("Initial bucket size is wrong: %d vs %d\n", len(h.bkts), _BSZ) - } - // Create _INBOX style end tokens - var toks [INS][]byte - for i := range toks { - u := make([]byte, 13) - io.ReadFull(rand.Reader, u) - toks[i] = []byte(hex.EncodeToString(u)) - h.Set(toks[i], toks[i]) - tg := h.Get(toks[i]).([]byte) - if !bytes.Equal(tg, toks[i]) { - t.Fatalf("Did not match properly, '%s' vs '%s'\n", tg, toks[i]) - } - } - if len(h.bkts) != EXP { - t.Fatalf("Expanded bucket size is wrong: %d vs %d\n", len(h.bkts), EXP) - } - for i := 0; i < REM; i++ { - h.Remove(toks[i]) - } - if len(h.bkts) != EXP2 { - t.Fatalf("Shrunk bucket size is wrong: %d vs %d\n", len(h.bkts), EXP2) - } -} - -func TestFalseLookup(t *testing.T) { - h := New() - // DW + W - h.Set([]byte("cache.test.0"), "foo") - v := h.Get([]byte("cache.test.1")) - if v != nil { - t.Fatalf("Had a match when did not expect one!\n") - } - // DW + W + 3 - h.Set([]byte("cache.test.1234"), "foo") - v = h.Get([]byte("cache.test.0000")) - if v != nil { - t.Fatalf("Had a match when did not expect one!\n") - } -} - -func TestRemoveRandom(t *testing.T) { - h := New() - h.RemoveRandom() - - h.Set(foo, "1") - h.Set(bar, "1") - h.Set(baz, "1") - - if h.Count() != 3 { - t.Fatalf("Expected 3 members, got %d\n", h.Count()) - } - - h.RemoveRandom() - - if h.Count() != 2 { - t.Fatalf("Expected 2 members, got %d\n", h.Count()) - } -} - -func Benchmark_GoMap___GetSmallKey(b *testing.B) { - b.StopTimer() - m := make(map[string][]byte) - m["foo"] = bar - b.StartTimer() - - for i := 0; i < b.N; i++ { - _ = m["foo"] - } -} - -func Benchmark_HashMap_GetSmallKey(b *testing.B) { - b.StopTimer() - m := New() - m.Set(foo, bar) - b.StartTimer() - - for i := 0; i < b.N; i++ { - _ = m.Get(foo) - } -} - -func Benchmark_GoMap____GetMedKey(b *testing.B) { - b.StopTimer() - ts := string(med) - m := make(map[string][]byte) - m[ts] = bar - b.StartTimer() - - for i := 0; i < b.N; i++ { - _ = m[ts] - } -} - -func Benchmark_HashMap__GetMedKey(b *testing.B) { - b.StopTimer() - m := New() - m.Set(sub, bar) - b.StartTimer() - - for i := 0; i < b.N; i++ { - _ = m.Get(med) - } -} - -func Benchmark_GoMap____GetLrgKey(b *testing.B) { - b.StopTimer() - ts := string(sub) - m := make(map[string][]byte) - m[ts] = bar - b.StartTimer() - - for i := 0; i < b.N; i++ { - _ = m[ts] - } -} - -func Benchmark_HashMap__GetLrgKey(b *testing.B) { - b.StopTimer() - m := New() - m.Set(sub, bar) - b.StartTimer() - - for i := 0; i < b.N; i++ { - _ = m.Get(sub) - } -} - -func Benchmark_GoMap_________Set(b *testing.B) { - b.StopTimer() - m := make(map[string][]byte) - b.StartTimer() - - for i := 0; i < b.N; i++ { - m["foo"] = bar - } -} - -func Benchmark_HashMap_______Set(b *testing.B) { - b.StopTimer() - m := New() - b.StartTimer() - - for i := 0; i < b.N; i++ { - m.Set(foo, bar) - } -} diff --git a/hashmap/rand_evict.go b/hashmap/rand_evict.go deleted file mode 100644 index 81c4ce6d..00000000 --- a/hashmap/rand_evict.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2012-2013 Apcera Inc. All rights reserved. - -package hashmap - -import ( - "math/rand" - "time" -) - -// We use init to setup the random number generator -func init() { - rand.Seed(time.Now().UnixNano()) -} - -// RemoveRandom can be used for a random policy eviction. -// This is stochastic but very fast and does not impede -// performance like LRU, LFU or even ARC based implementations. -func (h *HashMap) RemoveRandom() { - if h.used == 0 { - return - } - index := (rand.Int()) & int(h.msk) - // Walk forward til we find an entry - for i := index; i < len(h.bkts); i++ { - e := &h.bkts[i] - if *e != nil { - *e = (*e).next - h.used-- - return - } - } - // If we are here we hit end and did not remove anything, - // use the index and walk backwards. - for i := index; i >= 0; i-- { - e := &h.bkts[i] - if *e != nil { - *e = (*e).next - h.used-- - return - } - } - panic("Should not reach here..") -} diff --git a/hashmap/results.txt b/hashmap/results.txt deleted file mode 100644 index 8db09dd8..00000000 --- a/hashmap/results.txt +++ /dev/null @@ -1,52 +0,0 @@ -2015 iMac 5k 4Ghz i7 -MacOSX 10.11.2 - -==================== -Go version go1.5.2 -==================== - -PASS -Benchmark_GoMap___GetSmallKey-8 300000000 5.06 ns/op 197.63 mops/s -Benchmark_HashMap_GetSmallKey-8 100000000 10.6 ns/op 94.34 mops/s -Benchmark_GoMap____GetMedKey-8 300000000 5.09 ns/op 196.46 mops/s -Benchmark_HashMap__GetMedKey-8 200000000 6.75ns/op 148.15 mops/s -Benchmark_GoMap____GetLrgKey-8 300000000 4.88ns/op 204.91 mops/s -Benchmark_HashMap__GetLrgKey-8 100000000 17.8 ns/op 56.18 mops/s -Benchmark_GoMap_________Set-8 50000000 26.3 ns/op 38.02 mops/s -Benchmark_HashMap_______Set-8 20000000 82.4 ns/op 12.13 mops/s - -2013 MacbookAir 11" i7 1.7Ghz Haswell -Linux mint15 3.8.0-19 - -==================== -Go version go1.2.1 -==================== - -Benchmark_GoMap___GetSmallKey 500000000 7.57 ns/op 132.05 mops/s -Benchmark_HashMap_GetSmallKey 100000000 14.30 ns/op 70.08 mops/s -Benchmark_GoMap____GetMedKey 500000000 4.83 ns/op 207.01 mops/s -Benchmark_HashMap__GetMedKey 200000000 9.54 ns/op 104.82 mops/s -Benchmark_GoMap____GetLrgKey 500000000 4.39 ns/op 227.79 mops/s -Benchmark_HashMap__GetLrgKey 100000000 24.50 ns/op 40.77 mops/s - -==================== -Go version go1.2.1 -==================== - -Benchmark_GoMap___GetSmallKey 200000000 8.77 ns/op 114.02 mops/s -Benchmark_HashMap_GetSmallKey 100000000 14.80 ns/op 67.53 mops/s -Benchmark_GoMap____GetMedKey 500000000 6.21 ns/op 161.05 mops/s -Benchmark_HashMap__GetMedKey 200000000 9.51 ns/op 105.15 mops/s -Benchmark_GoMap____GetLrgKey 100000000 18.30 ns/op 54.68 mops/s -Benchmark_HashMap__GetLrgKey 100000000 24.80 ns/op 40.36 mops/s - -==================== -Go version go1.0.3 -==================== - -Benchmark_GoMap___GetSmallKey 50000000 52.20 ns/op 19.17 mops/s -Benchmark_HashMap_GetSmallKey 100000000 15.50 ns/op 64.34 mops/s -Benchmark_GoMap____GetMedKey 50000000 61.60 ns/op 16.24 mops/s -Benchmark_HashMap__GetMedKey 200000000 8.91 ns/op 112.20 mops/s -Benchmark_GoMap____GetLrgKey 20000000 86.90 ns/op 11.51 mops/s -Benchmark_HashMap__GetLrgKey 100000000 25.40 ns/op 39.44 mops/s diff --git a/scripts/cov.sh b/scripts/cov.sh index 82615dcf..73f3ca9f 100755 --- a/scripts/cov.sh +++ b/scripts/cov.sh @@ -5,11 +5,8 @@ rm -rf ./cov mkdir cov go test -v -covermode=atomic -coverprofile=./cov/auth.out ./auth go test -v -covermode=atomic -coverprofile=./cov/conf.out ./conf -go test -v -covermode=atomic -coverprofile=./cov/hash.out ./hash -go test -v -covermode=atomic -coverprofile=./cov/hashmap.out ./hashmap go test -v -covermode=atomic -coverprofile=./cov/log.out ./logger go test -v -covermode=atomic -coverprofile=./cov/server.out ./server -go test -v -covermode=atomic -coverprofile=./cov/sublist.out ./sublist go test -v -covermode=atomic -coverprofile=./cov/test.out ./test gocovmerge ./cov/*.out > acc.out rm -rf ./cov diff --git a/server/client.go b/server/client.go index 8a90194c..4f5a61e9 100644 --- a/server/client.go +++ b/server/client.go @@ -1,4 +1,4 @@ -// Copyright 2012-2015 Apcera Inc. All rights reserved. +// Copyright 2012-2016 Apcera Inc. All rights reserved. package server @@ -11,9 +11,6 @@ import ( "sync" "sync/atomic" "time" - - "github.com/nats-io/gnatsd/hashmap" - "github.com/nats-io/gnatsd/sublist" ) func init() { @@ -48,7 +45,7 @@ type client struct { ncs string bw *bufio.Writer srv *Server - subs *hashmap.HashMap + subs map[string]*subscription pcd map[*client]struct{} atmr *time.Timer ptmr *time.Timer @@ -102,7 +99,7 @@ func (c *client) initClient(tlsConn bool) { s := c.srv c.cid = atomic.AddUint64(&s.gcid, 1) c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize) - c.subs = hashmap.New() + c.subs = make(map[string]*subscription) c.debug = (atomic.LoadInt32(&debug) != 0) c.trace = (atomic.LoadInt32(&trace) != 0) @@ -457,7 +454,7 @@ func (c *client) processPub(arg []byte) error { return ErrMaxPayload } - if c.opts.Pedantic && !sublist.IsValidLiteralSubject(c.pa.subject) { + if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) { c.sendErr("Invalid Subject") } return nil @@ -516,12 +513,13 @@ func (c *client) processSub(argo []byte) (err error) { // We can have two SUB protocols coming from a route due to some // race conditions. We should make sure that we process only one. - if c.subs.Get(sub.sid) == nil { - c.subs.Set(sub.sid, sub) + sid := string(sub.sid) + if c.subs[sid] == nil { + c.subs[sid] = sub if c.srv != nil { - err = c.srv.sl.Insert(sub.subject, sub) + err = c.srv.sl.Insert(sub) if err != nil { - c.subs.Remove(sub.sid) + delete(c.subs, sid) } else { shouldForward = c.typ != ROUTER } @@ -549,9 +547,9 @@ func (c *client) unsubscribe(sub *subscription) { return } c.traceOp("<-> %s", "DELSUB", sub.sid) - c.subs.Remove(sub.sid) + delete(c.subs, string(sub.sid)) if c.srv != nil { - c.srv.sl.Remove(sub.subject, sub) + c.srv.sl.Remove(sub) } } @@ -578,7 +576,7 @@ func (c *client) processUnsub(arg []byte) error { ok := false c.mu.Lock() - if sub, ok = (c.subs.Get(sid)).(*subscription); ok { + if sub, ok = c.subs[string(sid)]; ok { if max > 0 { sub.max = int64(max) } else { @@ -747,7 +745,7 @@ func (c *client) processMsg(msg []byte) { atomic.AddInt64(&srv.inMsgs, 1) atomic.AddInt64(&srv.inBytes, msgSize) - r := srv.sl.Match(c.pa.subject) + r := srv.sl.Match(string(c.pa.subject)) if len(r) <= 0 { return } @@ -779,9 +777,7 @@ func (c *client) processMsg(msg []byte) { var qmap map[string][]*subscription // Loop over all subscriptions that match. - for _, v := range r { - sub := v.(*subscription) - + for _, sub := range r { // Process queue group subscriptions by gathering them all up // here. We will pick the winners when we are done processing // all of the subscriptions. @@ -958,7 +954,10 @@ func (c *client) closeConnection() { c.nc = nil // Snapshot for use. - subs := c.subs.All() + subs := make([]*subscription, 0, len(c.subs)) + for _, sub := range c.subs { + subs = append(subs, sub) + } srv := c.srv retryImplicit := false @@ -973,14 +972,12 @@ func (c *client) closeConnection() { srv.removeClient(c) // Remove clients subscriptions. - for _, s := range subs { - if sub, ok := s.(*subscription); ok { - srv.sl.Remove(sub.subject, sub) - // Forward on unsubscribes if we are not - // a router ourselves. - if c.typ != ROUTER { - srv.broadcastUnSubscribe(sub) - } + for _, sub := range subs { + srv.sl.Remove(sub) + // Forward on unsubscribes if we are not + // a router ourselves. + if c.typ != ROUTER { + srv.broadcastUnSubscribe(sub) } } } diff --git a/server/client_test.go b/server/client_test.go index 72370bd9..967ee490 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1,3 +1,5 @@ +// Copyright 2012-2016 Apcera Inc. All rights reserved. + package server import ( @@ -436,9 +438,8 @@ func TestClientAutoUnsubExactReceived(t *testing.T) { <-ch // We should not have any subscriptions in place here. - if c.subs.Count() != 0 { - t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n", - c.subs.Count()) + if len(c.subs) != 0 { + t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n", len(c.subs)) } } @@ -466,9 +467,8 @@ func TestClientUnsubAfterAutoUnsub(t *testing.T) { <-ch // We should not have any subscriptions in place here. - if c.subs.Count() != 0 { - t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n", - c.subs.Count()) + if len(c.subs) != 0 { + t.Fatalf("Wrong number of subscriptions: expected 0, got %d\n", len(c.subs)) } } diff --git a/server/monitor.go b/server/monitor.go index 449bf3af..d07dda22 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -13,8 +13,6 @@ import ( "strconv" "sync/atomic" "time" - - "github.com/nats-io/gnatsd/sublist" ) // Snapshot this @@ -100,7 +98,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { case byCid: pairs[i] = Pair{Key: client, Val: int64(client.cid)} case bySubs: - pairs[i] = Pair{Key: client, Val: int64(client.subs.Count())} + pairs[i] = Pair{Key: client, Val: int64(len(client.subs))} case byPending: pairs[i] = Pair{Key: client, Val: int64(client.bw.Buffered())} case byOutMsgs: @@ -167,7 +165,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { ci.Idle = myUptime(c.Now.Sub(client.last)) ci.OutMsgs = client.outMsgs ci.OutBytes = client.outBytes - ci.NumSubs = client.subs.Count() + ci.NumSubs = uint32(len(client.subs)) ci.Pending = client.bw.Buffered() ci.Name = client.opts.Name ci.Lang = client.opts.Lang @@ -215,7 +213,11 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { } if subs == 1 { - ci.Subs = castToSliceString(client.subs.All()) + sublist := make([]*subscription, 0, len(client.subs)) + for _, sub := range client.subs { + sublist = append(sublist, sub) + } + ci.Subs = castToSliceString(sublist) } client.mu.Unlock() @@ -231,17 +233,17 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { ResponseHandler(w, r, b) } -func castToSliceString(input []interface{}) []string { +func castToSliceString(input []*subscription) []string { output := make([]string, 0, len(input)) for _, line := range input { - output = append(output, string(line.(*subscription).subject)) + output = append(output, string(line.subject)) } return output } // Subsz represents detail information on current connections. type Subsz struct { - *sublist.Stats + *SublistStats } // Routez represents detailed information on current client connections. @@ -292,11 +294,15 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) { OutMsgs: r.outMsgs, InBytes: r.inBytes, OutBytes: r.outBytes, - NumSubs: r.subs.Count(), + NumSubs: uint32(len(r.subs)), } if subs == 1 { - ri.Subs = castToSliceString(r.subs.All()) + sublist := make([]*subscription, 0, len(r.subs)) + for _, sub := range r.subs { + sublist = append(sublist, sub) + } + ri.Subs = castToSliceString(sublist) } r.mu.Unlock() @@ -325,7 +331,6 @@ func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) { s.mu.Unlock() st := &Subsz{s.sl.Stats()} - b, err := json.MarshalIndent(st, "", " ") if err != nil { Errorf("Error marshalling response to /subscriptionsz request: %v", err) diff --git a/server/route.go b/server/route.go index 54cbcadd..6d0d03f5 100644 --- a/server/route.go +++ b/server/route.go @@ -235,14 +235,15 @@ func (s *Server) sendLocalSubsToRoute(route *client) { s.mu.Lock() for _, client := range s.clients { client.mu.Lock() - subs := client.subs.All() + subs := make([]*subscription, 0, len(client.subs)) + for _, sub := range client.subs { + subs = append(subs, sub) + } client.mu.Unlock() - for _, s := range subs { - if sub, ok := s.(*subscription); ok { - rsid := routeSid(sub) - proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid) - b.WriteString(proto) - } + for _, sub := range subs { + rsid := routeSid(sub) + proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid) + b.WriteString(proto) } } s.mu.Unlock() @@ -411,7 +412,7 @@ func (s *Server) routeSidQueueSubscriber(rsid []byte) (*subscription, bool) { } sid := matches[RSID_SID_INDEX] - if sub, ok := (client.subs.Get(sid)).(*subscription); ok { + if sub, ok := client.subs[string(sid)]; ok { return sub, true } return nil, true diff --git a/server/server.go b/server/server.go index e544a747..4130bb67 100644 --- a/server/server.go +++ b/server/server.go @@ -19,8 +19,6 @@ import ( // Allow dynamic profiling. _ "net/http/pprof" - - "github.com/nats-io/gnatsd/sublist" ) // Info is the information sent to clients to help them understand information @@ -47,7 +45,7 @@ type Server struct { mu sync.Mutex info Info infoJSON []byte - sl *sublist.Sublist + sl *Sublist opts *Options cAuth Auth rAuth Auth @@ -101,7 +99,7 @@ func New(opts *Options) *Server { s := &Server{ info: info, - sl: sublist.New(), + sl: NewSublist(), opts: opts, debug: opts.Debug, trace: opts.Trace, @@ -697,9 +695,9 @@ func (s *Server) NumClients() int { // NumSubscriptions will report how many subscriptions are active. func (s *Server) NumSubscriptions() uint32 { s.mu.Lock() - defer s.mu.Unlock() - stats := s.sl.Stats() - return stats.NumSubs + subs := s.sl.Count() + s.mu.Unlock() + return subs } // Addr will return the net.Addr object for the current listener. diff --git a/server/split_test.go b/server/split_test.go index 8e940bfb..610e7a7f 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2015 Apcera Inc. All rights reserved. +// Copyright 2012-2016 Apcera Inc. All rights reserved. package server @@ -6,9 +6,6 @@ import ( "bytes" "net" "testing" - - "github.com/nats-io/gnatsd/hashmap" - "github.com/nats-io/gnatsd/sublist" ) func TestSplitBufferSubOp(t *testing.T) { @@ -16,8 +13,8 @@ func TestSplitBufferSubOp(t *testing.T) { defer cli.Close() defer trash.Close() - s := &Server{sl: sublist.New()} - c := &client{srv: s, subs: hashmap.New(), nc: cli} + s := &Server{sl: NewSublist()} + c := &client{srv: s, subs: make(map[string]*subscription), nc: cli} subop := []byte("SUB foo 1\r\n") subop1 := subop[:6] @@ -35,11 +32,11 @@ func TestSplitBufferSubOp(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected OP_START state vs %d\n", c.state) } - r := s.sl.Match([]byte("foo")) + r := s.sl.Match("foo") if r == nil || len(r) != 1 { t.Fatalf("Did not match subscription properly: %+v\n", r) } - sub := r[0].(*subscription) + sub := r[0] if !bytes.Equal(sub.subject, []byte("foo")) { t.Fatalf("Subject did not match expected 'foo' : '%s'\n", sub.subject) } @@ -52,8 +49,8 @@ func TestSplitBufferSubOp(t *testing.T) { } func TestSplitBufferUnsubOp(t *testing.T) { - s := &Server{sl: sublist.New()} - c := &client{srv: s, subs: hashmap.New()} + s := &Server{sl: NewSublist()} + c := &client{srv: s, subs: make(map[string]*subscription)} subop := []byte("SUB foo 1024\r\n") if err := c.parse(subop); err != nil { @@ -79,14 +76,14 @@ func TestSplitBufferUnsubOp(t *testing.T) { if c.state != OP_START { t.Fatalf("Expected OP_START state vs %d\n", c.state) } - r := s.sl.Match([]byte("foo")) + r := s.sl.Match("foo") if r != nil && len(r) != 0 { t.Fatalf("Should be no subscriptions in results: %+v\n", r) } } func TestSplitBufferPubOp(t *testing.T) { - c := &client{subs: hashmap.New()} + c := &client{subs: make(map[string]*subscription)} pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r") pub1 := pub[:2] pub2 := pub[2:9] @@ -152,7 +149,7 @@ func TestSplitBufferPubOp(t *testing.T) { } func TestSplitBufferPubOp2(t *testing.T) { - c := &client{subs: hashmap.New()} + c := &client{subs: make(map[string]*subscription)} pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r\n") pub1 := pub[:30] pub2 := pub[30:] @@ -172,7 +169,7 @@ func TestSplitBufferPubOp2(t *testing.T) { } func TestSplitBufferPubOp3(t *testing.T) { - c := &client{subs: hashmap.New()} + c := &client{subs: make(map[string]*subscription)} pubAll := []byte("PUB foo bar 11\r\nhello world\r\n") pub := pubAll[:16] @@ -198,7 +195,7 @@ func TestSplitBufferPubOp3(t *testing.T) { } func TestSplitBufferPubOp4(t *testing.T) { - c := &client{subs: hashmap.New()} + c := &client{subs: make(map[string]*subscription)} pubAll := []byte("PUB foo 11\r\nhello world\r\n") pub := pubAll[:12] @@ -224,7 +221,7 @@ func TestSplitBufferPubOp4(t *testing.T) { } func TestSplitBufferPubOp5(t *testing.T) { - c := &client{subs: hashmap.New()} + c := &client{subs: make(map[string]*subscription)} pubAll := []byte("PUB foo 11\r\nhello world\r\n") // Splits need to be on MSG_END now too, so make sure we check that. @@ -243,7 +240,7 @@ func TestSplitBufferPubOp5(t *testing.T) { } func TestSplitConnectArg(t *testing.T) { - c := &client{subs: hashmap.New()} + c := &client{subs: make(map[string]*subscription)} connectAll := []byte("CONNECT {\"verbose\":false,\"ssl_required\":false," + "\"user\":\"test\",\"pedantic\":true,\"pass\":\"pass\"}\r\n") @@ -291,7 +288,7 @@ func TestSplitConnectArg(t *testing.T) { } func TestSplitDanglingArgBuf(t *testing.T) { - c := &client{subs: hashmap.New()} + c := &client{subs: make(map[string]*subscription)} // We test to make sure we do not dangle any argBufs after processing // since that could lead to performance issues. @@ -373,7 +370,7 @@ func TestSplitMsgArg(t *testing.T) { } func TestSplitBufferMsgOp(t *testing.T) { - c := &client{subs: hashmap.New(), typ: ROUTER} + c := &client{subs: make(map[string]*subscription), typ: ROUTER} msg := []byte("MSG foo.bar QRSID:15:3 _INBOX.22 11\r\nhello world\r") msg1 := msg[:2] msg2 := msg[2:9] diff --git a/server/sublist.go b/server/sublist.go new file mode 100644 index 00000000..5be67ace --- /dev/null +++ b/server/sublist.go @@ -0,0 +1,532 @@ +// Copyright 2016 Apcera Inc. All rights reserved. + +// Package sublist is a routing mechanism to handle subject distribution +// and provides a facility to match subjects from published messages to +// interested subscribers. Subscribers can have wildcard subjects to match +// multiple published subjects. +package server + +import ( + "errors" + "strings" + "sync" + "sync/atomic" +) + +// Common byte variables for wildcards and token separator. +const ( + pwc = '*' + fwc = '>' + tsep = "." + btsep = '.' +) + +// Sublist related errors +var ( + ErrInvalidSubject = errors.New("sublist: Invalid Subject") + ErrNotFound = errors.New("sublist: No Matches Found") +) + +// cacheMax is used to bound limit the frontend cache +const slCacheMax = 1024 + +// A Sublist stores and efficiently retrieves subscriptions. +type Sublist struct { + sync.RWMutex + matches uint64 + cacheHits uint64 + inserts uint64 + removes uint64 + cache map[string][]*subscription + root *level + count uint32 +} + +// A node contains subscriptions and a pointer to the next level. +type node struct { + next *level + subs []*subscription +} + +// A level represents a group of nodes and special pointers to +// wildcard nodes. +type level struct { + nodes map[string]*node + pwc, fwc *node +} + +// Create a new default node. +func newNode() *node { + return &node{subs: make([]*subscription, 0, 4)} +} + +// Create a new default level. We use FNV1A as the hash +// algortihm for the tokens, which should be short. +func newLevel() *level { + return &level{nodes: make(map[string]*node)} +} + +// New will create a default sublist +func NewSublist() *Sublist { + return &Sublist{root: newLevel(), cache: make(map[string][]*subscription)} +} + +// Insert adds a subscription into the sublist +func (s *Sublist) Insert(sub *subscription) error { + // copy the subject since we hold this and this might be part of a large byte slice. + subject := string(append([]byte(nil), sub.subject...)) + tsa := [32]string{} + tokens := tsa[:0] + start := 0 + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + tokens = append(tokens, subject[start:i]) + start = i + 1 + } + } + tokens = append(tokens, subject[start:]) + + s.Lock() + + sfwc := false + l := s.root + var n *node + + for _, t := range tokens { + if len(t) == 0 || sfwc { + s.Unlock() + return ErrInvalidSubject + } + + switch t[0] { + case pwc: + n = l.pwc + case fwc: + n = l.fwc + sfwc = true + default: + n = l.nodes[t] + } + if n == nil { + n = newNode() + switch t[0] { + case pwc: + l.pwc = n + case fwc: + l.fwc = n + default: + l.nodes[t] = n + } + } + if n.next == nil { + n.next = newLevel() + } + l = n.next + } + n.subs = append(n.subs, sub) + s.count++ + s.inserts++ + + s.addToCache(subject, sub) + + s.Unlock() + + return nil +} + +// addToCache will add the new entry to existing cache +// entries if needed. Assumes write lock is held. +func (s *Sublist) addToCache(subject string, sub *subscription) { + for k, results := range s.cache { + if matchLiteral(k, subject) { + // Copy since others may have a reference. + nr := make([]*subscription, len(results), len(results)+1) + copy(nr, results) + s.cache[k] = append(nr, sub) + } + } +} + +// removeFromCache will remove the sub from any active cache entries. +// Assumes write lock is held. +func (s *Sublist) removeFromCache(subject string, sub *subscription) { + for k, _ := range s.cache { + if !matchLiteral(k, subject) { + continue + } + // Since someone else may be referecing, can't modify the list + // safely, just let it re-populate. + delete(s.cache, k) + } +} + +// Match will match all entries to the literal subject. +// It will return a slice of results. +// Note that queue subscribers will only have one member selected +// and returned for each queue group. +func (s *Sublist) Match(subject string) []*subscription { + s.RLock() + atomic.AddUint64(&s.matches, 1) + rc, ok := s.cache[subject] + s.RUnlock() + if ok { + atomic.AddUint64(&s.cacheHits, 1) + return rc + } + + tsa := [32]string{} + tokens := tsa[:0] + start := 0 + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + tokens = append(tokens, subject[start:i]) + start = i + 1 + } + } + tokens = append(tokens, subject[start:]) + + // FIXME(dlc) - Make pool? + results := []*subscription{} + + s.RLock() + results = matchLevel(s.root, tokens, results) + s.RUnlock() + + s.Lock() + // Add to our cache + s.cache[subject] = results + // Bound the number of entries to sublistMaxCache + if len(s.cache) > slCacheMax { + for k, _ := range s.cache { + delete(s.cache, k) + break + } + } + s.Unlock() + + return results +} + +// matchLevel is used to recursively descend into the trie. +func matchLevel(l *level, toks []string, results []*subscription) []*subscription { + var pwc, n *node + for i, t := range toks { + if l == nil { + return results + } + if l.fwc != nil { + results = append(results, l.fwc.subs...) + } + if pwc = l.pwc; pwc != nil { + results = matchLevel(pwc.next, toks[i+1:], results) + } + n = l.nodes[t] + if n != nil { + l = n.next + } else { + l = nil + } + } + if n != nil { + results = append(results, n.subs...) + } + if pwc != nil { + results = append(results, pwc.subs...) + } + return results +} + +// lnt is used to track descent into levels for a removal for pruning. +type lnt struct { + l *level + n *node + t string +} + +// Remove will remove a subscription. +func (s *Sublist) Remove(sub *subscription) error { + subject := string(sub.subject) + tsa := [32]string{} + tokens := tsa[:0] + start := 0 + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + tokens = append(tokens, subject[start:i]) + start = i + 1 + } + } + tokens = append(tokens, subject[start:]) + + s.Lock() + defer s.Unlock() + + sfwc := false + l := s.root + var n *node + + // Track levels for pruning + var lnts [32]lnt + levels := lnts[:0] + + for _, t := range tokens { + if len(t) == 0 || sfwc { + return ErrInvalidSubject + } + if l == nil { + return ErrNotFound + } + switch t[0] { + case pwc: + n = l.pwc + case fwc: + n = l.fwc + sfwc = true + default: + n = l.nodes[t] + } + if n != nil { + levels = append(levels, lnt{l, n, t}) + l = n.next + } else { + l = nil + } + } + if !s.removeFromNode(n, sub) { + return ErrNotFound + } + + s.count-- + s.removes++ + + for i := len(levels) - 1; i >= 0; i-- { + l, n, t := levels[i].l, levels[i].n, levels[i].t + if n.isEmpty() { + l.pruneNode(n, t) + } + } + s.removeFromCache(subject, sub) + + return nil +} + +// pruneNode is used to prune an empty node from the tree. +func (l *level) pruneNode(n *node, t string) { + if n == nil { + return + } + if n == l.fwc { + l.fwc = nil + } else if n == l.pwc { + l.pwc = nil + } else { + delete(l.nodes, t) + } +} + +// isEmpty will test if the node has any entries. Used +// in pruning. +func (n *node) isEmpty() bool { + if len(n.subs) == 0 { + if n.next == nil || n.next.numNodes() == 0 { + return true + } + } + return false +} + +// Return the number of nodes for the given level. +func (l *level) numNodes() int { + num := len(l.nodes) + if l.pwc != nil { + num++ + } + if l.fwc != nil { + num++ + } + return num +} + +// Remove the sub for the given node. +func (s *Sublist) removeFromNode(n *node, sub *subscription) bool { + if n == nil { + return false + } + sl := n.subs + for i := 0; i < len(sl); i++ { + if sl[i] == sub { + sl[i] = sl[len(sl)-1] + sl[len(sl)-1] = nil + sl = sl[:len(sl)-1] + n.subs = shrinkAsNeeded(sl) + return true + } + } + return false +} + +// Checks if we need to do a resize. This is for very large growth then +// subsequent return to a more normal size from unsubscribe. +func shrinkAsNeeded(sl []*subscription) []*subscription { + lsl := len(sl) + csl := cap(sl) + // Don't bother if list not too big + if csl <= 8 { + return sl + } + pFree := float32(csl-lsl) / float32(csl) + if pFree > 0.50 { + return append([]*subscription(nil), sl...) + } + return sl +} + +// Count returns the number of subscriptions. +func (s *Sublist) Count() uint32 { + s.RLock() + defer s.RUnlock() + return s.count +} + +// CacheCount returns the number of result sets in the cache. +func (s *Sublist) CacheCount() int { + s.RLock() + defer s.RUnlock() + return len(s.cache) +} + +// Public stats for the sublist +type SublistStats struct { + NumSubs uint32 `json:"num_subscriptions"` + NumCache uint32 `json:"num_cache"` + NumInserts uint64 `json:"num_inserts"` + NumRemoves uint64 `json:"num_removes"` + NumMatches uint64 `json:"num_matches"` + CacheHitRate float64 `json:"cache_hit_rate"` + MaxFanout uint32 `json:"max_fanout"` + AvgFanout float64 `json:"avg_fanout"` +} + +// Stats will return a stats structure for the current state. +func (s *Sublist) Stats() *SublistStats { + s.Lock() + defer s.Unlock() + + st := &SublistStats{} + st.NumSubs = s.count + st.NumCache = uint32(len(s.cache)) + st.NumInserts = s.inserts + st.NumRemoves = s.removes + st.NumMatches = s.matches + if s.matches > 0 { + st.CacheHitRate = float64(s.cacheHits) / float64(s.matches) + } + // whip through cache for fanout stats + tot, max := 0, 0 + for _, results := range s.cache { + l := len(results) + tot += l + if l > max { + max = l + } + } + st.MaxFanout = uint32(max) + if tot > 0 { + st.AvgFanout = float64(tot) / float64(len(s.cache)) + } + return st +} + +// numLevels will return the maximum number of levels +// contained in the Sublist tree. +func (s *Sublist) numLevels() int { + return visitLevel(s.root, 0) +} + +// visitLevel is used to descend the Sublist tree structure +// recursively. +func visitLevel(l *level, depth int) int { + if l == nil || l.numNodes() == 0 { + return depth + } + + depth++ + maxDepth := depth + + for _, n := range l.nodes { + if n == nil { + continue + } + newDepth := visitLevel(n.next, depth) + if newDepth > maxDepth { + maxDepth = newDepth + } + } + if l.pwc != nil { + pwcDepth := visitLevel(l.pwc.next, depth) + if pwcDepth > maxDepth { + maxDepth = pwcDepth + } + } + if l.fwc != nil { + fwcDepth := visitLevel(l.fwc.next, depth) + if fwcDepth > maxDepth { + maxDepth = fwcDepth + } + } + return maxDepth +} + +// IsValidLiteralSubject returns true if a subject is valid, false otherwise +func IsValidLiteralSubject(subject string) bool { + tokens := strings.Split(string(subject), tsep) + for _, t := range tokens { + if len(t) == 0 { + return false + } + if len(t) > 1 { + continue + } + switch t[0] { + case pwc, fwc: + return false + } + } + return true +} + +// matchLiteral is used to test literal subjects, those that do not have any +// wildcards, with a target subject. This is used in the cache layer. +func matchLiteral(literal, subject string) bool { + li := 0 + ll := len(literal) + for i := 0; i < len(subject); i++ { + if li >= ll { + return false + } + b := subject[i] + switch b { + case pwc: + // Skip token in literal + ll := len(literal) + for { + if li >= ll || literal[li] == btsep { + li-- + break + } + li++ + } + case fwc: + return true + default: + if b != literal[li] { + return false + } + } + li++ + } + // Make sure we have processed all of the literal's chars.. + if li < ll { + return false + } + return true +} diff --git a/server/sublist_test.go b/server/sublist_test.go new file mode 100644 index 00000000..3d34b0c8 --- /dev/null +++ b/server/sublist_test.go @@ -0,0 +1,457 @@ +package server + +import ( + "fmt" + "runtime" + "strings" + "sync" + "testing" + "time" + + dbg "runtime/debug" +) + +func stackFatalf(t *testing.T, f string, args ...interface{}) { + lines := make([]string, 0, 32) + msg := fmt.Sprintf(f, args...) + lines = append(lines, msg) + + // Generate the Stack of callers: Skip us and verify* frames. + for i := 2; true; i++ { + _, file, line, ok := runtime.Caller(i) + if !ok { + break + } + msg := fmt.Sprintf("%d - %s:%d", i, file, line) + lines = append(lines, msg) + } + t.Fatalf("%s", strings.Join(lines, "\n")) +} + +func verifyCount(s *Sublist, count uint32, t *testing.T) { + if s.Count() != count { + stackFatalf(t, "Count is %d, should be %d", s.Count(), count) + } +} + +func verifyLen(r []*subscription, l int, t *testing.T) { + if r == nil || len(r) != l { + stackFatalf(t, "Results len is %d, should be %d", len(r), l) + } +} + +func verifyNumLevels(s *Sublist, expected int, t *testing.T) { + dl := s.numLevels() + if dl != expected { + stackFatalf(t, "NumLevels is %d, should be %d", dl, expected) + } +} + +func verifyMember(r []*subscription, val *subscription, t *testing.T) { + for _, v := range r { + if v == nil { + continue + } + if v == val { + return + } + } + stackFatalf(t, "Value '%+v' not found in results", val) +} + +// Helper to generate test subscriptions. +func newSub(subject string) *subscription { + return &subscription{subject: []byte(subject)} +} + +func TestSublistInit(t *testing.T) { + s := NewSublist() + verifyCount(s, 0, t) +} + +func TestSublistInsertCount(t *testing.T) { + s := NewSublist() + s.Insert(newSub("foo")) + s.Insert(newSub("bar")) + s.Insert(newSub("foo.bar")) + verifyCount(s, 3, t) +} + +func TestSublistSimple(t *testing.T) { + s := NewSublist() + subject := "foo" + sub := newSub(subject) + s.Insert(sub) + r := s.Match(subject) + verifyLen(r, 1, t) + verifyMember(r, sub, t) +} + +func TestSublistSimpleMultiTokens(t *testing.T) { + s := NewSublist() + subject := "foo.bar.baz" + sub := newSub(subject) + s.Insert(sub) + r := s.Match(subject) + verifyLen(r, 1, t) + verifyMember(r, sub, t) +} + +func TestSublistPartialWildcard(t *testing.T) { + s := NewSublist() + lsub := newSub("a.b.c") + psub := newSub("a.*.c") + s.Insert(lsub) + s.Insert(psub) + r := s.Match("a.b.c") + verifyLen(r, 2, t) + verifyMember(r, lsub, t) + verifyMember(r, psub, t) +} + +func TestSublistPartialWildcardAtEnd(t *testing.T) { + s := NewSublist() + lsub := newSub("a.b.c") + psub := newSub("a.b.*") + s.Insert(lsub) + s.Insert(psub) + r := s.Match("a.b.c") + verifyLen(r, 2, t) + verifyMember(r, lsub, t) + verifyMember(r, psub, t) +} + +func TestSublistFullWildcard(t *testing.T) { + s := NewSublist() + lsub := newSub("a.b.c") + fsub := newSub("a.>") + s.Insert(lsub) + s.Insert(fsub) + r := s.Match("a.b.c") + verifyLen(r, 2, t) + verifyMember(r, lsub, t) + verifyMember(r, fsub, t) +} + +func TestSublistRemove(t *testing.T) { + s := NewSublist() + subject := "a.b.c.d" + sub := newSub(subject) + s.Insert(sub) + verifyCount(s, 1, t) + r := s.Match(subject) + verifyLen(r, 1, t) + s.Remove(newSub("a.b.c")) + verifyCount(s, 1, t) + s.Remove(sub) + verifyCount(s, 0, t) + r = s.Match(subject) + verifyLen(r, 0, t) +} + +func TestSublistRemoveWildcard(t *testing.T) { + s := NewSublist() + subject := "a.b.c.d" + sub := newSub(subject) + psub := newSub("a.b.*.d") + fsub := newSub("a.b.>") + s.Insert(sub) + s.Insert(psub) + s.Insert(fsub) + verifyCount(s, 3, t) + r := s.Match(subject) + verifyLen(r, 3, t) + s.Remove(sub) + verifyCount(s, 2, t) + s.Remove(fsub) + verifyCount(s, 1, t) + s.Remove(psub) + verifyCount(s, 0, t) + r = s.Match(subject) + verifyLen(r, 0, t) +} + +func TestSublistRemoveCleanup(t *testing.T) { + s := NewSublist() + literal := "a.b.c.d.e.f" + depth := len(strings.Split(literal, tsep)) + sub := newSub(literal) + verifyNumLevels(s, 0, t) + s.Insert(sub) + verifyNumLevels(s, depth, t) + s.Remove(sub) + verifyNumLevels(s, 0, t) +} + +func TestSublistRemoveCleanupWildcards(t *testing.T) { + s := NewSublist() + subject := "a.b.*.d.e.>" + depth := len(strings.Split(subject, tsep)) + sub := newSub(subject) + verifyNumLevels(s, 0, t) + s.Insert(sub) + verifyNumLevels(s, depth, t) + s.Remove(sub) + verifyNumLevels(s, 0, t) +} + +func TestSublistInvalidSubjectsInsert(t *testing.T) { + s := NewSublist() + + // Insert, or subscribtions, can have wildcards, but not empty tokens, + // and can not have a FWC that is not the terminal token. + + // beginning empty token + if err := s.Insert(newSub(".foo")); err != ErrInvalidSubject { + t.Fatal("Expected invalid subject error") + } + + // trailing empty token + if err := s.Insert(newSub("foo.")); err != ErrInvalidSubject { + t.Fatal("Expected invalid subject error") + } + // empty middle token + if err := s.Insert(newSub("foo..bar")); err != ErrInvalidSubject { + t.Fatal("Expected invalid subject error") + } + // empty middle token #2 + if err := s.Insert(newSub("foo.bar..baz")); err != ErrInvalidSubject { + t.Fatal("Expected invalid subject error") + } + // fwc not terminal + if err := s.Insert(newSub("foo.>.bar")); err != ErrInvalidSubject { + t.Fatal("Expected invalid subject error") + } +} + +func TestSublistCache(t *testing.T) { + s := NewSublist() + + // Test add a remove logistics + subject := "a.b.c.d" + sub := newSub(subject) + psub := newSub("a.b.*.d") + fsub := newSub("a.b.>") + s.Insert(sub) + r := s.Match(subject) + verifyLen(r, 1, t) + s.Insert(psub) + s.Insert(fsub) + verifyCount(s, 3, t) + r = s.Match(subject) + verifyLen(r, 3, t) + s.Remove(sub) + verifyCount(s, 2, t) + s.Remove(fsub) + verifyCount(s, 1, t) + s.Remove(psub) + verifyCount(s, 0, t) + + // Check that cache is now empty + if cc := s.CacheCount(); cc != 0 { + t.Fatalf("Cache should be zero, got %d\n", cc) + } + + r = s.Match(subject) + verifyLen(r, 0, t) + + for i := 0; i < 2*slCacheMax; i++ { + s.Match(fmt.Sprintf("foo-%d\n", i)) + } + + if cc := s.CacheCount(); cc > slCacheMax { + t.Fatalf("Cache should be constrained by cacheMax, got %d for current count\n", cc) + } +} + +func checkBool(b, expected bool, t *testing.T) { + if b != expected { + dbg.PrintStack() + t.Fatalf("Expected %v, but got %v\n", expected, b) + } +} + +func TestSublistValidLiteralSubjects(t *testing.T) { + checkBool(IsValidLiteralSubject("foo"), true, t) + checkBool(IsValidLiteralSubject(".foo"), false, t) + checkBool(IsValidLiteralSubject("foo."), false, t) + checkBool(IsValidLiteralSubject("foo..bar"), false, t) + checkBool(IsValidLiteralSubject("foo.bar.*"), false, t) + checkBool(IsValidLiteralSubject("foo.bar.>"), false, t) + checkBool(IsValidLiteralSubject("*"), false, t) + checkBool(IsValidLiteralSubject(">"), false, t) +} + +func TestSublistMatchLiterals(t *testing.T) { + checkBool(matchLiteral("foo", "foo"), true, t) + checkBool(matchLiteral("foo", "bar"), false, t) + checkBool(matchLiteral("foo", "*"), true, t) + checkBool(matchLiteral("foo", ">"), true, t) + checkBool(matchLiteral("foo.bar", ">"), true, t) + checkBool(matchLiteral("foo.bar", "foo.>"), true, t) + checkBool(matchLiteral("foo.bar", "bar.>"), false, t) + checkBool(matchLiteral("stats.test.22", "stats.>"), true, t) + checkBool(matchLiteral("stats.test.22", "stats.*.*"), true, t) + checkBool(matchLiteral("foo.bar", "foo"), false, t) + checkBool(matchLiteral("stats.test.foos", "stats.test.foos"), true, t) + checkBool(matchLiteral("stats.test.foos", "stats.test.foo"), false, t) +} + +func TestSublistBadSubjectOnRemove(t *testing.T) { + bad := "a.b..d" + sub := newSub(bad) + + s := NewSublist() + if err := s.Insert(sub); err != ErrInvalidSubject { + t.Fatalf("Expected ErrInvalidSubject, got %v\n", err) + } + + if err := s.Remove(sub); err != ErrInvalidSubject { + t.Fatalf("Expected ErrInvalidSubject, got %v\n", err) + } + + badfwc := "a.>.b" + if err := s.Remove(newSub(badfwc)); err != ErrInvalidSubject { + t.Fatalf("Expected ErrInvalidSubject, got %v\n", err) + } +} + +// This is from bug report #18 +func TestSublistTwoTokenPubMatchSingleTokenSub(t *testing.T) { + s := NewSublist() + sub := newSub("foo") + s.Insert(sub) + r := s.Match("foo") + verifyLen(r, 1, t) + verifyMember(r, sub, t) + r = s.Match("foo.bar") + verifyLen(r, 0, t) +} + +// -- Benchmarks Setup -- + +var subs []*subscription +var toks = []string{"apcera", "continuum", "component", "router", "api", "imgr", "jmgr", "auth"} +var sl = NewSublist() +var results = make([]*subscription, 0, 64) + +func init() { + subs = make([]*subscription, 0, 256*1024) + subsInit("") + for i := 0; i < len(subs); i++ { + sl.Insert(subs[i]) + } + addWildcards() +} + +func subsInit(pre string) { + var sub string + for _, t := range toks { + if len(pre) > 0 { + sub = pre + tsep + t + } else { + sub = t + } + subs = append(subs, newSub(sub)) + if len(strings.Split(sub, tsep)) < 5 { + subsInit(sub) + } + } +} + +func addWildcards() { + sl.Insert(newSub("cloud.>")) + sl.Insert(newSub("cloud.continuum.component.>")) + sl.Insert(newSub("cloud.*.*.router.*")) +} + +// -- Benchmarks Setup End -- + +func Benchmark______________________SublistInsert(b *testing.B) { + s := NewSublist() + for i, l := 0, len(subs); i < b.N; i++ { + index := i % l + s.Insert(subs[index]) + } +} + +func Benchmark____________SublistMatchSingleToken(b *testing.B) { + for i := 0; i < b.N; i++ { + sl.Match("apcera") + } +} + +func Benchmark______________SublistMatchTwoTokens(b *testing.B) { + for i := 0; i < b.N; i++ { + sl.Match("apcera.continuum") + } +} + +func Benchmark____________SublistMatchThreeTokens(b *testing.B) { + for i := 0; i < b.N; i++ { + sl.Match("apcera.continuum.component") + } +} + +func Benchmark_____________SublistMatchFourTokens(b *testing.B) { + for i := 0; i < b.N; i++ { + sl.Match("apcera.continuum.component.router") + } +} + +func Benchmark_SublistMatchFourTokensSingleResult(b *testing.B) { + for i := 0; i < b.N; i++ { + sl.Match("apcera.continuum.component.router") + } +} + +func Benchmark_SublistMatchFourTokensMultiResults(b *testing.B) { + for i := 0; i < b.N; i++ { + sl.Match("cloud.continuum.component.router") + } +} + +func Benchmark_______SublistMissOnLastTokenOfFive(b *testing.B) { + for i := 0; i < b.N; i++ { + sl.Match("apcera.continuum.component.router.ZZZZ") + } +} + +func multiRead(b *testing.B, num int) { + b.StopTimer() + var swg, fwg sync.WaitGroup + swg.Add(num) + fwg.Add(num) + s := "apcera.continuum.component.router" + for i := 0; i < num; i++ { + go func() { + swg.Done() + swg.Wait() + for i := 0; i < b.N; i++ { + sl.Match(s) + } + fwg.Done() + }() + } + swg.Wait() + b.StartTimer() + fwg.Wait() +} + +func Benchmark_____________Sublist10XMultipleReads(b *testing.B) { + multiRead(b, 10) +} + +func Benchmark____________Sublist100XMultipleReads(b *testing.B) { + multiRead(b, 100) +} + +func _BenchmarkRSS(b *testing.B) { + runtime.GC() + var m runtime.MemStats + runtime.ReadMemStats(&m) + println("HEAP:", m.HeapObjects) + println("ALLOC:", m.Alloc) + println("TOTAL ALLOC:", m.TotalAlloc) + time.Sleep(30 * 1e9) +} diff --git a/sublist/sublist.go b/sublist/sublist.go deleted file mode 100644 index 6a2abb2d..00000000 --- a/sublist/sublist.go +++ /dev/null @@ -1,555 +0,0 @@ -// Copyright 2012-2016 Apcera Inc. All rights reserved. - -// Package sublist handles subject distribution and provides a facility -// match subjects to interested subscribers. Subscribers can have wildcard -// subjects to match multiple published subjects. -package sublist - -import ( - "errors" - "sync" - "sync/atomic" - "time" - - "github.com/nats-io/gnatsd/hashmap" -) - -// A Sublist stores and efficiently retrieves subscriptions. It uses a -// tree structure and an efficient RR cache to achieve quick lookups. -type Sublist struct { - mu sync.RWMutex - root *level - count uint32 - cache *hashmap.HashMap - cmax int - stats stats -} - -// Use padding for better cache alignement when multiple goroutines -// updating the stats. -type stats struct { - _ [8]int64 - matches uint64 - _ [7]int64 - cacheHits uint64 - _ [7]int64 - inserts uint64 - _ [7]int64 - removes uint64 - _ [7]int64 - since time.Time -} - -// A node contains subscriptions and a pointer to the next level. -type node struct { - next *level - subs []interface{} -} - -// A level represents a group of nodes and special pointers to -// wildcard nodes. -type level struct { - nodes *hashmap.HashMap - pwc, fwc *node -} - -// Create a new default node. -func newNode() *node { - return &node{subs: make([]interface{}, 0, 4)} -} - -// Create a new default level. We use FNV1A as the hash -// algortihm for the tokens, which should be short. -func newLevel() *level { - h := hashmap.New() - return &level{nodes: h} -} - -// defaultCacheMax is used to bound limit the frontend cache -const defaultCacheMax = 1024 - -// New will create a default sublist -func New() *Sublist { - return &Sublist{ - root: newLevel(), - cache: hashmap.New(), - cmax: defaultCacheMax, - stats: stats{since: time.Now()}, - } -} - -// Common byte variables for wildcards and token separator. -const ( - _PWC = byte('*') - _FWC = byte('>') - _SEP = byte('.') -) - -// Sublist related errors -var ( - ErrInvalidSubject = errors.New("Invalid Subject") - ErrNotFound = errors.New("No Matches Found") -) - -// split will split a subject into tokens -func split(subject []byte, tokens [][]byte) [][]byte { - start := 0 - for i, b := range subject { - if b == _SEP { - tokens = append(tokens, subject[start:i]) - start = i + 1 - } - } - return append(tokens, subject[start:]) -} - -// Insert adds a subject into the sublist -func (s *Sublist) Insert(subject []byte, sub interface{}) error { - tsa := [16][]byte{} - toks := split(subject, tsa[:0]) - - s.mu.Lock() - sfwc := false - l := s.root - var n *node - - for _, t := range toks { - if len(t) == 0 || sfwc { - s.mu.Unlock() - return ErrInvalidSubject - } - switch t[0] { - case _PWC: - n = l.pwc - case _FWC: - n = l.fwc - sfwc = true - default: - if v := l.nodes.Get(t); v == nil { - n = nil - } else { - n = v.(*node) - } - } - if n == nil { - n = newNode() - switch t[0] { - case _PWC: - l.pwc = n - case _FWC: - l.fwc = n - default: - l.nodes.Set(t, n) - } - } - if n.next == nil { - n.next = newLevel() - } - l = n.next - } - n.subs = append(n.subs, sub) - s.count++ - s.stats.inserts++ - s.addToCache(subject, sub) - s.mu.Unlock() - return nil -} - -// addToCache will add the new entry to existing cache -// entries if needed. Assumes write lock is held. -func (s *Sublist) addToCache(subject []byte, sub interface{}) { - if s.cache.Count() == 0 { - return - } - - // FIXME(dlc) avoid allocation? - all := s.cache.AllKeys() - for _, k := range all { - if !matchLiteral(k, subject) { - continue - } - r := s.cache.Get(k) - if r == nil { - continue - } - res := r.([]interface{}) - res = append(res, sub) - s.cache.Set(k, res) - } -} - -// removeFromCache will remove the sub from any active cache entries. -// Assumes write lock is held. -func (s *Sublist) removeFromCache(subject []byte, sub interface{}) { - if s.cache.Count() == 0 { - return - } - all := s.cache.AllKeys() - for _, k := range all { - if !matchLiteral(k, subject) { - continue - } - // FIXME(dlc), right now just remove all matching cache - // entries. This could be smarter and walk small result - // lists and delete the individual sub. - s.cache.Remove(k) - } -} - -// Match will match all entries to the literal subject. It will return a -// slice of results. -func (s *Sublist) Match(subject []byte) []interface{} { - // Fastpath match on cache - s.mu.RLock() - atomic.AddUint64(&s.stats.matches, 1) - r := s.cache.Get(subject) - s.mu.RUnlock() - - if r != nil { - atomic.AddUint64(&s.stats.cacheHits, 1) - return r.([]interface{}) - } - // Cache miss - - // Process subject into tokens, this is performed - // unlocked, so can be parallel. - tsa := [32][]byte{} - toks := tsa[:0] - - start := 0 - for i, b := range subject { - if b == _SEP { - toks = append(toks, subject[start:i]) - start = i + 1 - } - } - toks = append(toks, subject[start:]) - results := make([]interface{}, 0, 4) - - // Lock the sublist and lookup and add entry to cache. - s.mu.Lock() - matchLevel(s.root, toks, &results) - - // We use random eviction to bound the size of the cache. - // RR is used for speed purposes here. - if int(s.cache.Count()) >= s.cmax { - s.cache.RemoveRandom() - } - // Make sure we copy the subject key here - scopy := make([]byte, len(subject)) - copy(scopy, subject) - s.cache.Set(scopy, results) - s.mu.Unlock() - - return results -} - -// matchLevel is used to recursively descend into the trie when there -// is a cache miss. -func matchLevel(l *level, toks [][]byte, results *[]interface{}) { - var pwc, n *node - for i, t := range toks { - if l == nil { - return - } - if l.fwc != nil { - *results = append(*results, l.fwc.subs...) - } - if pwc = l.pwc; pwc != nil { - matchLevel(pwc.next, toks[i+1:], results) - } - if v := l.nodes.Get(t); v == nil { - n = nil - } else { - n = v.(*node) - } - if n != nil { - l = n.next - } else { - l = nil - } - } - if n != nil { - *results = append(*results, n.subs...) - } - if pwc != nil { - *results = append(*results, pwc.subs...) - } - return -} - -// lnt is used to track descent into a removal for pruning. -type lnt struct { - l *level - n *node - t []byte -} - -// Remove will remove any item associated with key. It will track descent -// into the trie and prune upon successful removal. -func (s *Sublist) Remove(subject []byte, sub interface{}) error { - tsa := [16][]byte{} - toks := split(subject, tsa[:0]) - - s.mu.Lock() - sfwc := false - l := s.root - var n *node - - var lnts [32]lnt - levels := lnts[:0] - - for _, t := range toks { - if len(t) == 0 || sfwc { - s.mu.Unlock() - return ErrInvalidSubject - } - if l == nil { - s.mu.Unlock() - return ErrNotFound - } - switch t[0] { - case _PWC: - n = l.pwc - case _FWC: - n = l.fwc - sfwc = true - default: - if v := l.nodes.Get(t); v == nil { - n = nil - } else { - n = v.(*node) - } - } - if n != nil { - levels = append(levels, lnt{l, n, t}) - l = n.next - } else { - l = nil - } - } - if !s.removeFromNode(n, sub) { - s.mu.Unlock() - return ErrNotFound - } - - s.count-- - s.stats.removes++ - - for i := len(levels) - 1; i >= 0; i-- { - l, n, t := levels[i].l, levels[i].n, levels[i].t - if n.isEmpty() { - l.pruneNode(n, t) - } - } - s.removeFromCache(subject, sub) - s.mu.Unlock() - return nil -} - -// pruneNode is used to prune and empty node from the tree. -func (l *level) pruneNode(n *node, t []byte) { - if n == nil { - return - } - if n == l.fwc { - l.fwc = nil - } else if n == l.pwc { - l.pwc = nil - } else { - l.nodes.Remove(t) - } -} - -// isEmpty will test if the node has any entries. Used -// in pruning. -func (n *node) isEmpty() bool { - if len(n.subs) == 0 { - if n.next == nil || n.next.numNodes() == 0 { - return true - } - } - return false -} - -// Return the number of nodes for the given level. -func (l *level) numNodes() uint32 { - num := l.nodes.Count() - if l.pwc != nil { - num++ - } - if l.fwc != nil { - num++ - } - return num -} - -// Remove the sub for the given node. -func (s *Sublist) removeFromNode(n *node, sub interface{}) bool { - if n == nil { - return false - } - for i, v := range n.subs { - if v == sub { - num := len(n.subs) - a := n.subs - copy(a[i:num-1], a[i+1:num]) - n.subs = a[0 : num-1] - return true - } - } - return false -} - -// matchLiteral is used to test literal subjects, those that do not have any -// wildcards, with a target subject. This is used in the cache layer. -func matchLiteral(literal, subject []byte) bool { - li := 0 - ll := len(literal) - for _, b := range subject { - if li >= ll { - return false - } - switch b { - case _PWC: - // Skip token in literal - ll := len(literal) - for { - if li >= ll || literal[li] == _SEP { - li-- - break - } - li++ - } - case _FWC: - return true - default: - if b != literal[li] { - return false - } - } - li++ - } - // Make sure we have processed all of the literal's chars.. - if li < ll { - return false - } - return true -} - -// IsValidLiteralSubject returns true if a subject is valid, false otherwise -func IsValidLiteralSubject(subject []byte) bool { - tsa := [16][]byte{} - toks := split(subject, tsa[:0]) - - for _, t := range toks { - if len(t) == 0 { - return false - } - if len(t) > 1 { - continue - } - switch t[0] { - case _PWC, _FWC: - return false - } - } - return true -} - -// Count return the number of stored items in the HashMap. -func (s *Sublist) Count() uint32 { return s.count } - -// Stats for the sublist -type Stats struct { - NumSubs uint32 `json:"num_subscriptions"` - NumCache uint32 `json:"num_cache"` - NumInserts uint64 `json:"num_inserts"` - NumRemoves uint64 `json:"num_removes"` - NumMatches uint64 `json:"num_matches"` - CacheHitRate float64 `json:"cache_hit_rate"` - MaxFanout uint32 `json:"max_fanout"` - AvgFanout float64 `json:"avg_fanout"` - StatsTime time.Time `json:"stats_time"` -} - -// Stats will return a stats structure for the current state. -func (s *Sublist) Stats() *Stats { - s.mu.Lock() - defer s.mu.Unlock() - - st := &Stats{} - st.NumSubs = s.count - st.NumCache = s.cache.Count() - st.NumInserts = s.stats.inserts - st.NumRemoves = s.stats.removes - st.NumMatches = s.stats.matches - if s.stats.matches > 0 { - st.CacheHitRate = float64(s.stats.cacheHits) / float64(s.stats.matches) - } - // whip through cache for fanout stats - // FIXME, creating all each time could be expensive, should do a cb version. - tot, max := 0, 0 - all := s.cache.All() - for _, r := range all { - l := len(r.([]interface{})) - tot += l - if l > max { - max = l - } - } - st.MaxFanout = uint32(max) - if tot > 0 { - st.AvgFanout = float64(tot) / float64(len(all)) - } - st.StatsTime = s.stats.since - return st -} - -// ResetStats will clear stats and update StatsTime to time.Now() -func (s *Sublist) ResetStats() { - s.stats = stats{} - s.stats.since = time.Now() -} - -// numLevels will return the maximum number of levels -// contained in the Sublist tree. -func (s *Sublist) numLevels() int { - return visitLevel(s.root, 0) -} - -// visitLevel is used to descend the Sublist tree structure -// recursively. -func visitLevel(l *level, depth int) int { - if l == nil || l.numNodes() == 0 { - return depth - } - - depth++ - maxDepth := depth - - all := l.nodes.All() - for _, a := range all { - if a == nil { - continue - } - n := a.(*node) - newDepth := visitLevel(n.next, depth) - if newDepth > maxDepth { - maxDepth = newDepth - } - } - if l.pwc != nil { - pwcDepth := visitLevel(l.pwc.next, depth) - if pwcDepth > maxDepth { - maxDepth = pwcDepth - } - } - if l.fwc != nil { - fwcDepth := visitLevel(l.fwc.next, depth) - if fwcDepth > maxDepth { - maxDepth = fwcDepth - } - } - return maxDepth -} diff --git a/sublist/sublist_test.go b/sublist/sublist_test.go deleted file mode 100644 index 0cd03712..00000000 --- a/sublist/sublist_test.go +++ /dev/null @@ -1,529 +0,0 @@ -package sublist - -import ( - "bytes" - "fmt" - "runtime" - "runtime/debug" - "strings" - "sync" - "testing" - "time" -) - -func verifyCount(s *Sublist, count uint32, t *testing.T) { - if s.Count() != count { - t.Errorf("Count is %d, should be %d", s.Count(), count) - } -} - -func verifyLen(r []interface{}, l int, t *testing.T) { - if len(r) != l { - t.Errorf("Results len is %d, should be %d", len(r), l) - } -} - -func verifyMember(r []interface{}, val string, t *testing.T) { - for _, v := range r { - if v == nil { - continue - } - if v.(string) == val { - return - } - } - t.Errorf("Value '%s' not found in results", val) -} - -func verifyNumLevels(s *Sublist, expected int, t *testing.T) { - dl := s.numLevels() - if dl != expected { - t.Errorf("NumLevels is %d, should be %d", dl, expected) - } -} - -func TestInit(t *testing.T) { - s := New() - verifyCount(s, 0, t) -} - -func TestInsertCount(t *testing.T) { - s := New() - s.Insert([]byte("foo"), "a") - s.Insert([]byte("bar"), "b") - s.Insert([]byte("foo.bar"), "b") - verifyCount(s, 3, t) -} - -func TestSimple(t *testing.T) { - s := New() - val := "a" - sub := []byte("foo") - s.Insert(sub, val) - r := s.Match(sub) - verifyLen(r, 1, t) - verifyMember(r, val, t) -} - -func TestSimpleMultiTokens(t *testing.T) { - s := New() - val := "a" - sub := []byte("foo.bar.baz") - s.Insert(sub, val) - r := s.Match(sub) - verifyLen(r, 1, t) - verifyMember(r, val, t) -} - -func TestPartialWildcard(t *testing.T) { - s := New() - literal := []byte("a.b.c") - pwc := []byte("a.*.c") - a, b := "a", "b" - s.Insert(literal, a) - s.Insert(pwc, b) - r := s.Match(literal) - verifyLen(r, 2, t) - verifyMember(r, a, t) - verifyMember(r, b, t) -} - -func TestPartialWildcardAtEnd(t *testing.T) { - s := New() - literal := []byte("a.b.c") - pwc := []byte("a.b.*") - a, b := "a", "b" - s.Insert(literal, a) - s.Insert(pwc, b) - r := s.Match(literal) - verifyLen(r, 2, t) - verifyMember(r, a, t) - verifyMember(r, b, t) -} - -func TestFullWildcard(t *testing.T) { - s := New() - literal := []byte("a.b.c") - fwc := []byte("a.>") - a, b := "a", "b" - s.Insert(literal, a) - s.Insert(fwc, b) - r := s.Match(literal) - verifyLen(r, 2, t) - verifyMember(r, a, t) - verifyMember(r, b, t) -} - -func TestRemove(t *testing.T) { - s := New() - literal := []byte("a.b.c.d") - value := "foo" - s.Insert(literal, value) - verifyCount(s, 1, t) - s.Remove(literal, "bar") - verifyCount(s, 1, t) - s.Remove([]byte("a.b.c"), value) - verifyCount(s, 1, t) - s.Remove(literal, value) - verifyCount(s, 0, t) - r := s.Match(literal) - verifyLen(r, 0, t) -} - -func TestRemoveWildcard(t *testing.T) { - s := New() - literal := []byte("a.b.c.d") - pwc := []byte("a.b.*.d") - fwc := []byte("a.b.>") - value := "foo" - s.Insert(pwc, value) - s.Insert(fwc, value) - s.Insert(literal, value) - verifyCount(s, 3, t) - r := s.Match(literal) - verifyLen(r, 3, t) - s.Remove(literal, value) - verifyCount(s, 2, t) - s.Remove(fwc, value) - verifyCount(s, 1, t) - s.Remove(pwc, value) - verifyCount(s, 0, t) -} - -func TestRemoveCleanup(t *testing.T) { - s := New() - literal := []byte("a.b.c.d.e.f") - depth := len(bytes.Split(literal, []byte("."))) - value := "foo" - verifyNumLevels(s, 0, t) - s.Insert(literal, value) - verifyNumLevels(s, depth, t) - s.Remove(literal, value) - verifyNumLevels(s, 0, t) -} - -func TestRemoveCleanupWildcards(t *testing.T) { - s := New() - literal := []byte("a.b.*.d.e.>") - depth := len(bytes.Split(literal, []byte("."))) - value := "foo" - verifyNumLevels(s, 0, t) - s.Insert(literal, value) - verifyNumLevels(s, depth, t) - s.Remove(literal, value) - verifyNumLevels(s, 0, t) -} - -func TestInvalidSubjectsInsert(t *testing.T) { - s := New() - - // Insert, or subscribtions, can have wildcards, but not empty tokens, - // and can not have a FWC that is not terminal - - // beginning empty token - if err := s.Insert([]byte(".foo"), '@'); err != ErrInvalidSubject { - t.Fatal("Expected invalid subject error") - } - - // trailing empty token - if err := s.Insert([]byte("foo."), '@'); err != ErrInvalidSubject { - t.Fatal("Expected invalid subject error") - } - // empty middle token - if err := s.Insert([]byte("foo..bar"), '@'); err != ErrInvalidSubject { - t.Fatal("Expected invalid subject error") - } - // empty middle token #2 - if err := s.Insert([]byte("foo.bar..baz"), '@'); err != ErrInvalidSubject { - t.Fatal("Expected invalid subject error") - } - // fwc not terminal - if err := s.Insert([]byte("foo.>.bar"), '@'); err != ErrInvalidSubject { - t.Fatal("Expected invalid subject error") - } -} - -func TestCacheBehavior(t *testing.T) { - s := New() - literal := []byte("a.b.c") - fwc := []byte("a.>") - a, b := "a", "b" - s.Insert(literal, a) - r := s.Match(literal) - verifyLen(r, 1, t) - s.Insert(fwc, b) - r = s.Match(literal) - verifyLen(r, 2, t) - verifyMember(r, a, t) - verifyMember(r, b, t) - s.Remove(fwc, b) - r = s.Match(literal) - verifyLen(r, 1, t) - verifyMember(r, a, t) -} - -func checkBool(b, expected bool, t *testing.T) { - if b != expected { - debug.PrintStack() - t.Fatalf("Expected %v, but got %v\n", expected, b) - } -} - -func TestValidLiteralSubjects(t *testing.T) { - checkBool(IsValidLiteralSubject([]byte("foo")), true, t) - checkBool(IsValidLiteralSubject([]byte(".foo")), false, t) - checkBool(IsValidLiteralSubject([]byte("foo.")), false, t) - checkBool(IsValidLiteralSubject([]byte("foo..bar")), false, t) - checkBool(IsValidLiteralSubject([]byte("foo.bar.*")), false, t) - checkBool(IsValidLiteralSubject([]byte("foo.bar.>")), false, t) - checkBool(IsValidLiteralSubject([]byte("*")), false, t) - checkBool(IsValidLiteralSubject([]byte(">")), false, t) -} - -func TestMatchLiterals(t *testing.T) { - checkBool(matchLiteral([]byte("foo"), []byte("foo")), true, t) - checkBool(matchLiteral([]byte("foo"), []byte("bar")), false, t) - checkBool(matchLiteral([]byte("foo"), []byte("*")), true, t) - checkBool(matchLiteral([]byte("foo"), []byte(">")), true, t) - checkBool(matchLiteral([]byte("foo.bar"), []byte(">")), true, t) - checkBool(matchLiteral([]byte("foo.bar"), []byte("foo.>")), true, t) - checkBool(matchLiteral([]byte("foo.bar"), []byte("bar.>")), false, t) - checkBool(matchLiteral([]byte("stats.test.22"), []byte("stats.>")), true, t) - checkBool(matchLiteral([]byte("stats.test.22"), []byte("stats.*.*")), true, t) - checkBool(matchLiteral([]byte("foo.bar"), []byte("foo")), false, t) - checkBool(matchLiteral([]byte("stats.test.foos"), []byte("stats.test.foos")), true, t) - checkBool(matchLiteral([]byte("stats.test.foos"), []byte("stats.test.foo")), false, t) -} - -func TestCacheBounds(t *testing.T) { - s := New() - s.Insert([]byte("cache.>"), "foo") - - tmpl := "cache.test.%d" - loop := s.cmax + 100 - - for i := 0; i < loop; i++ { - sub := []byte(fmt.Sprintf(tmpl, i)) - s.Match(sub) - } - cs := int(s.cache.Count()) - if cs > s.cmax { - t.Fatalf("Cache is growing past limit: %d vs %d\n", cs, s.cmax) - } -} - -func TestStats(t *testing.T) { - s := New() - s.Insert([]byte("stats.>"), "fwc") - tmpl := "stats.test.%d" - loop := 255 - total := uint32(loop + 1) - - for i := 0; i < loop; i++ { - sub := []byte(fmt.Sprintf(tmpl, i)) - s.Insert(sub, "l") - } - - stats := s.Stats() - if time.Since(stats.StatsTime) > 50*time.Millisecond { - t.Fatalf("StatsTime seems incorrect: %+v\n", stats.StatsTime) - } - if stats.NumSubs != total { - t.Fatalf("Wrong stats for NumSubs: %d vs %d\n", stats.NumSubs, total) - } - if stats.NumInserts != uint64(total) { - t.Fatalf("Wrong stats for NumInserts: %d vs %d\n", stats.NumInserts, total) - } - if stats.NumRemoves != 0 { - t.Fatalf("Wrong stats for NumRemoves: %d vs %d\n", stats.NumRemoves, 0) - } - if stats.NumMatches != 0 { - t.Fatalf("Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, 0) - } - - for i := 0; i < loop; i++ { - s.Match([]byte("stats.test.22")) - } - s.Insert([]byte("stats.*.*"), "pwc") - s.Match([]byte("stats.test.22")) - - stats = s.Stats() - if stats.NumMatches != uint64(loop+1) { - t.Fatalf("Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, loop+1) - } - expectedCacheHitRate := 255.0 / 256.0 - if stats.CacheHitRate != expectedCacheHitRate { - t.Fatalf("Wrong stats for CacheHitRate: %.3g vs %0.3g\n", stats.CacheHitRate, expectedCacheHitRate) - } - if stats.MaxFanout != 3 { - t.Fatalf("Wrong stats for MaxFanout: %d vs %d\n", stats.MaxFanout, 3) - } - if stats.AvgFanout != 3.0 { - t.Fatalf("Wrong stats for AvgFanout: %g vs %g\n", stats.AvgFanout, 3.0) - } - s.ResetStats() - stats = s.Stats() - if time.Since(stats.StatsTime) > 50*time.Millisecond { - t.Fatalf("After Reset: StatsTime seems incorrect: %+v\n", stats.StatsTime) - } - if stats.NumInserts != 0 { - t.Fatalf("After Reset: Wrong stats for NumInserts: %d vs %d\n", stats.NumInserts, 0) - } - if stats.NumRemoves != 0 { - t.Fatalf("After Reset: Wrong stats for NumRemoves: %d vs %d\n", stats.NumRemoves, 0) - } - if stats.NumMatches != 0 { - t.Fatalf("After Reset: Wrong stats for NumMatches: %d vs %d\n", stats.NumMatches, 0) - } - if stats.CacheHitRate != 0.0 { - t.Fatalf("After Reset: Wrong stats for CacheHitRate: %.3g vs %0.3g\n", stats.CacheHitRate, 0.0) - } -} - -func TestResultSetSnapshots(t *testing.T) { - // Make sure result sets do not change out from underneath of us. - - literal := []byte("a.b.c.d.e.f") - wc := []byte("a.b.c.>") - value := "xxx" - - s := New() - s.Insert(literal, value) - - r := s.Match(literal) - verifyLen(r, 1, t) - - s.Insert(wc, value) - s.Insert(wc, value) - verifyLen(r, 1, t) - - s.Remove(wc, value) - verifyLen(r, 1, t) -} - -func TestBadSubjectOnRemove(t *testing.T) { - bad := []byte("a.b..d") - value := "bad" - - s := New() - if err := s.Insert(bad, value); err != ErrInvalidSubject { - t.Fatalf("Expected ErrInvalidSubject, got %v\n", err) - } - - if err := s.Remove(bad, value); err != ErrInvalidSubject { - t.Fatalf("Expected ErrInvalidSubject, got %v\n", err) - } - - badfwc := []byte("a.>.b") - if err := s.Remove(badfwc, value); err != ErrInvalidSubject { - t.Fatalf("Expected ErrInvalidSubject, got %v\n", err) - } -} - -// This is from bug report #18 -func TestTwoTokenPubMatchSingleTokenSub(t *testing.T) { - s := New() - val := "a" - sub := []byte("foo") - s.Insert(sub, val) - r := s.Match(sub) - verifyLen(r, 1, t) - verifyMember(r, val, t) - r = s.Match([]byte("foo.bar")) - verifyLen(r, 0, t) -} - -// -- Benchmarks Setup -- - -var subs [][]byte -var toks = []string{"apcera", "continuum", "component", "router", "api", "imgr", "jmgr", "auth"} -var sl = New() -var results = make([]interface{}, 0, 64) - -func init() { - subs = make([][]byte, 0, 256*1024) - subsInit("") - for i := 0; i < len(subs); i++ { - sl.Insert(subs[i], subs[i]) - } - addWildcards() -} - -func subsInit(pre string) { - var sub string - for _, t := range toks { - if len(pre) > 0 { - sub = pre + "." + t - } else { - sub = t - } - subs = append(subs, []byte(sub)) - if len(strings.Split(sub, ".")) < 5 { - subsInit(sub) - } - } -} - -func addWildcards() { - sl.Insert([]byte("cloud.>"), "paas") - sl.Insert([]byte("cloud.continuum.component.>"), "health") - sl.Insert([]byte("cloud.*.*.router.*"), "traffic") -} - -// -- Benchmarks Setup End -- - -func Benchmark______________________Insert(b *testing.B) { - s := New() - for i, l := 0, len(subs); i < b.N; i++ { - index := i % l - s.Insert(subs[index], subs[index]) - } -} - -func Benchmark____________MatchSingleToken(b *testing.B) { - s := []byte("apcera") - for i := 0; i < b.N; i++ { - sl.Match(s) - } -} - -func Benchmark______________MatchTwoTokens(b *testing.B) { - s := []byte("apcera.continuum") - for i := 0; i < b.N; i++ { - sl.Match(s) - } -} - -func Benchmark____________MatchThreeTokens(b *testing.B) { - s := []byte("apcera.continuum.component") - for i := 0; i < b.N; i++ { - sl.Match(s) - } -} - -func Benchmark_____________MatchFourTokens(b *testing.B) { - s := []byte("apcera.continuum.component.router") - for i := 0; i < b.N; i++ { - sl.Match(s) - } -} - -func Benchmark_MatchFourTokensSingleResult(b *testing.B) { - s := []byte("apcera.continuum.component.router") - for i := 0; i < b.N; i++ { - sl.Match(s) - } -} - -func Benchmark_MatchFourTokensMultiResults(b *testing.B) { - s := []byte("cloud.continuum.component.router") - for i := 0; i < b.N; i++ { - sl.Match(s) - } -} - -func Benchmark_______MissOnLastTokenOfFive(b *testing.B) { - s := []byte("apcera.continuum.component.router.ZZZZ") - for i := 0; i < b.N; i++ { - sl.Match(s) - } -} - -func multiRead(b *testing.B, num int) { - b.StopTimer() - var swg, fwg sync.WaitGroup - swg.Add(num) - fwg.Add(num) - s := []byte("apcera.continuum.component.router") - for i := 0; i < num; i++ { - go func() { - swg.Done() - swg.Wait() - for i := 0; i < b.N; i++ { - sl.Match(s) - } - fwg.Done() - }() - } - swg.Wait() - b.StartTimer() - fwg.Wait() -} - -func Benchmark_____________10XMultipleReads(b *testing.B) { - multiRead(b, 10) -} - -func Benchmark____________100XMultipleReads(b *testing.B) { - multiRead(b, 100) -} - -func _BenchmarkRSS(b *testing.B) { - runtime.GC() - var m runtime.MemStats - runtime.ReadMemStats(&m) - println("HEAP:", m.HeapObjects) - println("ALLOC:", m.Alloc) - println("TOTAL ALLOC:", m.TotalAlloc) - time.Sleep(30 * 1e9) -}