SequenceSet is an AVL tree with variable bitmask nodes to contain large delete maps for streams.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-01-17 20:12:43 -08:00
parent 09afcee9d9
commit 1f6aa94405
4 changed files with 1064 additions and 0 deletions

199
server/avl/norace_test.go Normal file
View File

@@ -0,0 +1,199 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !race && !skip_no_race_tests
// +build !race,!skip_no_race_tests
package avl
import (
"flag"
"fmt"
"math"
"math/rand"
"runtime"
"runtime/debug"
"testing"
"time"
)
// Print Results: go test -v --args --results
var printResults = flag.Bool("results", false, "Enable Results Logging")
// SequenceSet memory tests vs dmaps.
func TestNoRaceSeqSetSizeComparison(t *testing.T) {
// Create 5M random entries (dupes possible but ok for this test) out of 8M range.
num := 5_000_000
max := 7_000_000
seqs := make([]uint64, 0, num)
for i := 0; i < num; i++ {
n := uint64(rand.Int63n(int64(max + 1)))
seqs = append(seqs, n)
}
runtime.GC()
// Disable to get stable results.
gcp := debug.SetGCPercent(-1)
defer debug.SetGCPercent(gcp)
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
inUseBefore := mem.HeapInuse
dmap := make(map[uint64]struct{}, num)
for _, n := range seqs {
dmap[n] = struct{}{}
}
runtime.ReadMemStats(&mem)
dmapUse := mem.HeapInuse - inUseBefore
inUseBefore = mem.HeapInuse
// Now do SequenceSet on same dataset.
var sset SequenceSet
for _, n := range seqs {
sset.Insert(n)
}
runtime.ReadMemStats(&mem)
seqSetUse := mem.HeapInuse - inUseBefore
if seqSetUse > 2*1024*1024 {
t.Fatalf("Expected SequenceSet size to be < 2M, got %v", friendlyBytes(seqSetUse))
}
if seqSetUse*50 > dmapUse {
t.Fatalf("Expected SequenceSet to be at least 50x better then dmap approach: %v vs %v",
friendlyBytes(seqSetUse),
friendlyBytes(dmapUse),
)
}
}
func TestNoRaceSeqSetEncodeLarge(t *testing.T) {
num := 2_500_000
max := 5_000_000
dmap := make(map[uint64]struct{}, num)
var ss SequenceSet
for i := 0; i < num; i++ {
n := uint64(rand.Int63n(int64(max + 1)))
ss.Insert(n)
dmap[n] = struct{}{}
}
// Disable to get stable results.
gcp := debug.SetGCPercent(-1)
defer debug.SetGCPercent(gcp)
// In general should be about the same, but can see some variability.
expected := 500 * time.Microsecond
start := time.Now()
b, err := ss.Encode(nil)
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > expected {
t.Fatalf("Expected encode of %d items with encoded size %v to take less than %v, got %v",
num, friendlyBytes(len(b)), expected, elapsed)
} else {
logResults("Encode time for %d items was %v, encoded size is %v\n", num, elapsed, friendlyBytes(len(b)))
}
start = time.Now()
ss2, err := Decode(b)
require_NoError(t, err)
if elapsed := time.Since(start); elapsed > expected {
t.Fatalf("Expected decode to take less than %v, got %v", expected, elapsed)
} else {
logResults("Decode time is %v\n", elapsed)
}
require_True(t, ss.Nodes() == ss2.Nodes())
}
func TestNoRaceSeqSetRelativeSpeed(t *testing.T) {
// Create 1M random entries (dupes possible but ok for this test) out of 3M range.
num := 1_000_000
max := 3_000_000
seqs := make([]uint64, 0, num)
for i := 0; i < num; i++ {
n := uint64(rand.Int63n(int64(max + 1)))
seqs = append(seqs, n)
}
start := time.Now()
// Now do SequenceSet on same dataset.
var sset SequenceSet
for _, n := range seqs {
sset.Insert(n)
}
ssInsertElapsed := time.Since(start)
logResults("Inserts SequenceSet: %v for %d items\n", ssInsertElapsed, num)
start = time.Now()
for _, n := range seqs {
if ok := sset.Exists(n); !ok {
t.Fatalf("Should exist")
}
}
ssLookupElapsed := time.Since(start)
logResults("Lookups: %v\n", ssLookupElapsed)
// Now do a map.
dmap := make(map[uint64]struct{})
start = time.Now()
for _, n := range seqs {
dmap[n] = struct{}{}
}
mapInsertElapsed := time.Since(start)
logResults("Inserts Map[uint64]: %v for %d items\n", mapInsertElapsed, num)
start = time.Now()
for _, n := range seqs {
if _, ok := dmap[n]; !ok {
t.Fatalf("Should exist")
}
}
mapLookupElapsed := time.Since(start)
logResults("Lookups: %v\n", mapLookupElapsed)
// In general we are between 1.5 and 1.75 times slower atm then a straight map.
// Let's test an upper bound of 2x for now.
if mapInsertElapsed*2 <= ssInsertElapsed {
t.Fatalf("Expected SequenceSet insert to be no more than 2x slower (%v vs %v)", mapInsertElapsed, ssInsertElapsed)
}
if mapLookupElapsed*2 <= ssLookupElapsed {
t.Fatalf("Expected SequenceSet lookups to be no more than 2x slower (%v vs %v)", mapLookupElapsed, ssLookupElapsed)
}
}
// friendlyBytes returns a string with the given bytes int64
// represented as a size, such as 1KB, 10MB, etc...
func friendlyBytes[T int | uint64 | int64](bytes T) string {
fbytes := float64(bytes)
base := 1024
pre := []string{"K", "M", "G", "T", "P", "E"}
if fbytes < float64(base) {
return fmt.Sprintf("%v B", fbytes)
}
exp := int(math.Log(fbytes) / math.Log(float64(base)))
index := exp - 1
return fmt.Sprintf("%.2f %sB", fbytes/math.Pow(float64(base), float64(exp)), pre[index])
}
func logResults(format string, args ...interface{}) {
if *printResults {
fmt.Printf(format, args...)
}
}

544
server/avl/seqset.go Normal file
View File

@@ -0,0 +1,544 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package avl
import (
"encoding/binary"
"errors"
"math/bits"
"sort"
)
// SequenceSet is a memory and encoding optimized set for storing unsigned ints.
//
// SequenceSet is ~80-100 times more efficient memory wise than a map[uint64]struct{}.
// SequenceSet is ~1.75 times slower at inserts than the same map.
// SequenceSet is not thread safe.
//
// We use an AVL tree with nodes that hold bitmasks for set membership.
//
// Encoding will convert to a space optimized encoding using bitmasks.
type SequenceSet struct {
root *node // root node
size int // number of items
nodes int // number of nodes
// Having this here vs on the stack in Insert/Delete
// makes a difference in memory usage.
changed bool
}
// Insert will insert the sequence into the set.
// The tree will be balanced inline.
func (ss *SequenceSet) Insert(seq uint64) {
if ss.root = ss.root.insert(seq, &ss.changed, &ss.nodes); ss.changed {
ss.changed = false
ss.size++
}
}
// Exists will return true iff the sequence is a member of this set.
func (ss *SequenceSet) Exists(seq uint64) bool {
for n := ss.root; n != nil; {
if seq < n.base {
n = n.l
continue
} else if seq >= n.base+numEntries {
n = n.r
continue
}
return n.exists(seq)
}
return false
}
// Delete will remove the sequence from the set.
// Wil optionally remove nodes and rebalance.
func (ss *SequenceSet) Delete(seq uint64) {
if ss == nil || ss.root == nil {
return
}
ss.root = ss.root.delete(seq, &ss.changed, &ss.nodes)
if ss.changed {
ss.changed = false
ss.size--
}
}
// Size returns the number of items in the set.
func (ss *SequenceSet) Size() int {
return ss.size
}
// Nodes returns the number of nodes in the tree.
func (ss *SequenceSet) Nodes() int {
return ss.nodes
}
// Empty will clear all items from a set.
func (ss *SequenceSet) Empty() {
ss.root = nil
ss.size = 0
ss.nodes = 0
}
// IsEmpty is a fast check of the set being empty.
func (ss *SequenceSet) IsEmpty() bool {
if ss == nil || ss.root == nil {
return true
}
return false
}
// Range will invoke the given function for each item in the set.
// They will range over the set in ascending order.
// If the callback returns false we terminate the iteration.
func (ss *SequenceSet) Range(f func(uint64) bool) {
ss.root.iter(f)
}
// Heights returns the left and right heights of the tree.
func (ss *SequenceSet) Heights() (l, r int) {
if ss.root == nil {
return 0, 0
}
if ss.root.l != nil {
l = ss.root.l.h
}
if ss.root.r != nil {
r = ss.root.r.h
}
return l, r
}
// MinMax will return the minunum and maximum values in the set.
func (ss *SequenceSet) MinMax() (min, max uint64) {
if ss.root == nil {
return 0, 0
}
for l := ss.root; l != nil; l = l.l {
if l.l == nil {
min = l.min()
}
}
for r := ss.root; r != nil; r = r.r {
if r.r == nil {
max = r.max()
}
}
return min, max
}
func clone(src *node, target **node) {
if src == nil {
return
}
n := &node{base: src.base, bits: src.bits, h: src.h}
*target = n
clone(src.l, &n.l)
clone(src.r, &n.r)
}
// Clone will return a clone of the given SequenceSet.
func (ss *SequenceSet) Clone() *SequenceSet {
if ss == nil {
return nil
}
css := &SequenceSet{nodes: ss.nodes, size: ss.size}
clone(ss.root, &css.root)
return css
}
// Union will return a union of all sets.
func Union(ssa ...*SequenceSet) *SequenceSet {
if len(ssa) == 0 {
return nil
}
// Sort so we can clone largest.
sort.Slice(ssa, func(i, j int) bool { return ssa[i].Size() > ssa[j].Size() })
ss := ssa[0].Clone()
// Insert the rest through range call.
for i := 1; i < len(ssa); i++ {
ssa[i].Range(func(n uint64) bool {
ss.Insert(n)
return true
})
}
return ss
}
const (
// Magic is used to identify the encode binary state..
magic = uint8(22)
// Version
version = uint8(1)
// hdrLen
hdrLen = 2
// minimum length of an encoded SequenceSet.
minLen = 2 + 4 // magic + version + num nodes.
)
// EncodeLen returns the bytes needed for encoding.
func (ss SequenceSet) EncodeLen() int {
return minLen + (ss.Nodes() * ((numBuckets+1)*8 + 2))
}
func (ss SequenceSet) Encode(buf []byte) ([]byte, error) {
nn, encLen := ss.Nodes(), ss.EncodeLen()
if len(buf) < encLen {
buf = make([]byte, encLen)
}
// TODO(dlc) - Go 1.19 introduced Append to not have to keep track.
// Once 1.20 is out we could change this over.
// Also binary.Write() is way slower, do not use.
var le = binary.LittleEndian
buf[0], buf[1] = magic, version
i := hdrLen
le.PutUint32(buf[i:], uint32(nn))
i += 4
ss.root.nodeIter(func(n *node) {
le.PutUint64(buf[i:], n.base)
i += 8
for _, b := range n.bits {
le.PutUint64(buf[i:], b)
i += 8
}
le.PutUint16(buf[i:], uint16(n.h))
i += 2
})
return buf[:i], nil
}
// ErrBadEncoding is returned when we can not decode properly.
var ErrBadEncoding = errors.New("ss: bad encoding")
func Decode(buf []byte) (*SequenceSet, error) {
if len(buf) < minLen || buf[0] != magic || buf[1] != version {
return nil, ErrBadEncoding
}
var le = binary.LittleEndian
index := 2
nn := int(le.Uint32(buf[index:]))
index += 4
expectedLen := minLen + (nn * ((numBuckets+1)*8 + 2))
if len(buf) != expectedLen {
return nil, ErrBadEncoding
}
nodes := make([]node, nn)
var ss SequenceSet
for i := 0; i < nn; i++ {
n := &nodes[i]
n.base = le.Uint64(buf[index:])
index += 8
for bi := range n.bits {
n.bits[bi] = le.Uint64(buf[index:])
index += 8
}
n.h = int(le.Uint16(buf[index:]))
index += 2
ss.insertNode(n)
}
return &ss, nil
}
// insertNode places a decoded node into the tree.
// These should be done in tree order as defined by Encode()
// This allows us to not have to calculate height or do rebalancing.
// So much better performance this way.
func (ss *SequenceSet) insertNode(n *node) {
ss.nodes++
if ss.root == nil {
ss.root = n
return
}
// Walk our way to the insertion point.
for p := ss.root; p != nil; {
if n.base < p.base {
if p.l == nil {
p.l = n
return
}
p = p.l
} else {
if p.r == nil {
p.r = n
return
}
p = p.r
}
}
}
const (
bitsPerBucket = 64 // bits in uint64
numBuckets = 64
numEntries = numBuckets * bitsPerBucket
)
type node struct {
//v dvalue
base uint64
bits [numBuckets]uint64
l *node
r *node
h int
}
// Set the proper bit.
// seq should have already been qualified and inserted should be non nil.
func (n *node) set(seq uint64, inserted *bool) {
seq -= n.base
i := seq / bitsPerBucket
mask := uint64(1) << (seq % bitsPerBucket)
if (n.bits[i] & mask) == 0 {
n.bits[i] |= mask
*inserted = true
}
}
func (n *node) insert(seq uint64, inserted *bool, nodes *int) *node {
if n == nil {
base := (seq / numEntries) * numEntries
n := &node{base: base, h: 1}
n.set(seq, inserted)
*nodes++
return n
}
if seq < n.base {
n.l = n.l.insert(seq, inserted, nodes)
} else if seq >= n.base+numEntries {
n.r = n.r.insert(seq, inserted, nodes)
} else {
n.set(seq, inserted)
}
n.h = maxH(n) + 1
// Don't make a function, impacts performance.
if bf := balanceF(n); bf > 1 {
// Left unbalanced.
if n.l.base+numEntries > seq {
return n.rotateR()
} else {
n.l = n.l.rotateL()
return n.rotateR()
}
} else if bf < -1 {
// right unbalanced.
if n.r.base+numEntries > seq {
n.r = n.r.rotateR()
return n.rotateL()
} else {
return n.rotateL()
}
}
return n
}
func (n *node) rotateL() *node {
r := n.r
if r != nil {
n.r = r.l
r.l = n
n.h = maxH(n) + 1
r.h = maxH(r) + 1
} else {
n.r = nil
n.h = maxH(n) + 1
}
return r
}
func (n *node) rotateR() *node {
l := n.l
if l != nil {
n.l = l.r
l.r = n
n.h = maxH(n) + 1
l.h = maxH(l) + 1
} else {
n.l = nil
n.h = maxH(n) + 1
}
return l
}
func balanceF(n *node) int {
if n == nil {
return 0
}
var lh, rh int
if n.l != nil {
lh = n.l.h
}
if n.r != nil {
rh = n.r.h
}
return lh - rh
}
func maxH(n *node) int {
var lh, rh int
if n.l != nil {
lh = n.l.h
}
if n.r != nil {
rh = n.r.h
}
if lh > rh {
return lh
}
return rh
}
// Clear the proper bit.
// seq should have already been qualified and deleted should be non nil.
// Will return true if this node is now empty.
func (n *node) clear(seq uint64, deleted *bool) bool {
seq -= n.base
i := seq / bitsPerBucket
mask := uint64(1) << (seq % bitsPerBucket)
if (n.bits[i] & mask) != 0 {
n.bits[i] &^= mask
*deleted = true
}
for _, b := range n.bits {
if b != 0 {
return false
}
}
return true
}
func (n *node) delete(seq uint64, deleted *bool, nodes *int) *node {
if n == nil {
return nil
}
if seq < n.base {
n.l = n.l.delete(seq, deleted, nodes)
} else if seq >= n.base+numEntries {
n.r = n.r.delete(seq, deleted, nodes)
} else if empty := n.clear(seq, deleted); empty {
*nodes--
if nn := n.l; nn == nil {
n = n.r
} else if nn.r == nil {
nn.r = n.r
n = nn
} else {
nn.r.r = n.r
n = nn
}
}
// Check balance.
if bf := balanceF(n); bf > 1 {
// Left unbalanced.
if n.l.base+numEntries > seq {
return n.rotateR()
} else {
n.l = n.l.rotateL()
return n.rotateR()
}
} else if bf < -1 {
// right unbalanced.
if n.r.base+numEntries > seq {
n.r = n.r.rotateR()
return n.rotateL()
} else {
return n.rotateL()
}
}
return n
}
func (n *node) exists(seq uint64) bool {
seq -= n.base
i := seq / bitsPerBucket
mask := uint64(1) << (seq % bitsPerBucket)
return n.bits[i]&mask != 0
}
// Return minimum sequence in the set.
// This node can not be empty.
func (n *node) min() uint64 {
for i, b := range n.bits {
if b != 0 {
return n.base +
uint64(i*bitsPerBucket) +
uint64(bits.TrailingZeros64(b))
}
}
return 0
}
// Return maximum sequence in the set.
// This node can not be empty.
func (n *node) max() uint64 {
for i := numBuckets - 1; i >= 0; i-- {
if b := n.bits[i]; b != 0 {
return n.base +
uint64(i*bitsPerBucket) +
uint64(bitsPerBucket-bits.LeadingZeros64(b>>1))
}
}
return 0
}
// This is done in tree order.
func (n *node) nodeIter(f func(n *node)) {
if n == nil {
return
}
f(n)
n.l.nodeIter(f)
n.r.nodeIter(f)
}
// iter will iterate through the set's items in this node.
// If the supplied function returns false we terminate the iteration.
func (n *node) iter(f func(uint64) bool) bool {
if n == nil {
return true
}
if ok := n.l.iter(f); !ok {
return false
}
for num := n.base; num < n.base+numEntries; num++ {
if n.exists(num) {
if ok := f(num); !ok {
return false
}
}
}
if ok := n.r.iter(f); !ok {
return false
}
return true
}

233
server/avl/seqset_test.go Normal file
View File

@@ -0,0 +1,233 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package avl
import (
"math/rand"
"testing"
)
func TestSeqSetBasics(t *testing.T) {
var ss SequenceSet
seqs := []uint64{22, 222, 2222, 2, 2, 4}
for _, seq := range seqs {
ss.Insert(seq)
require_True(t, ss.Exists(seq))
}
require_True(t, ss.Nodes() == 1)
require_True(t, ss.Size() == len(seqs)-1) // We have one dup in there.
lh, rh := ss.Heights()
require_True(t, lh == 0)
require_True(t, rh == 0)
}
func TestSeqSetLeftLean(t *testing.T) {
var ss SequenceSet
for i := uint64(4 * numEntries); i > 0; i-- {
ss.Insert(i)
}
require_True(t, ss.Nodes() == 5)
require_True(t, ss.Size() == 4*numEntries)
lh, rh := ss.Heights()
require_True(t, lh == 2)
require_True(t, rh == 1)
}
func TestSeqSetRightLean(t *testing.T) {
var ss SequenceSet
for i := uint64(0); i < uint64(4*numEntries); i++ {
ss.Insert(i)
}
require_True(t, ss.Nodes() == 4)
require_True(t, ss.Size() == 4*numEntries)
lh, rh := ss.Heights()
require_True(t, lh == 1)
require_True(t, rh == 2)
}
func TestSeqSetCorrectness(t *testing.T) {
// Generate 100k sequences across 500k range.
num := 100_000
max := 500_000
set := make(map[uint64]struct{}, num)
var ss SequenceSet
for i := 0; i < num; i++ {
n := uint64(rand.Int63n(int64(max + 1)))
ss.Insert(n)
set[n] = struct{}{}
}
for i := uint64(0); i <= uint64(max); i++ {
_, exists := set[i]
require_True(t, ss.Exists(i) == exists)
}
}
func TestSeqSetRange(t *testing.T) {
num := 2*numEntries + 22
nums := make([]uint64, 0, num)
for i := 0; i < num; i++ {
nums = append(nums, uint64(i))
}
rand.Shuffle(len(nums), func(i, j int) { nums[i], nums[j] = nums[j], nums[i] })
var ss SequenceSet
for _, n := range nums {
ss.Insert(n)
}
nums = nums[:0]
ss.Range(func(n uint64) bool {
nums = append(nums, n)
return true
})
require_True(t, len(nums) == num)
for i := uint64(0); i < uint64(num); i++ {
require_True(t, nums[i] == i)
}
// Test truncating the range call.
nums = nums[:0]
ss.Range(func(n uint64) bool {
if n >= 10 {
return false
}
nums = append(nums, n)
return true
})
require_True(t, len(nums) == 10)
for i := uint64(0); i < 10; i++ {
require_True(t, nums[i] == i)
}
}
func TestSeqSetDelete(t *testing.T) {
var ss SequenceSet
// Simple single node.
seqs := []uint64{22, 222, 2222, 2, 2, 4}
for _, seq := range seqs {
ss.Insert(seq)
}
for _, seq := range seqs {
ss.Delete(seq)
require_True(t, !ss.Exists(seq))
}
require_True(t, ss.root == nil)
num := 22*numEntries + 22
nums := make([]uint64, 0, num)
for i := 0; i < num; i++ {
nums = append(nums, uint64(i))
}
rand.Shuffle(len(nums), func(i, j int) { nums[i], nums[j] = nums[j], nums[i] })
for _, n := range nums {
ss.Insert(n)
}
for _, n := range nums {
ss.Delete(n)
require_True(t, !ss.Exists(n))
}
require_True(t, ss.root == nil)
}
func TestSeqSetMinMax(t *testing.T) {
var ss SequenceSet
// Simple single node.
seqs := []uint64{22, 222, 2222, 2, 2, 4}
for _, seq := range seqs {
ss.Insert(seq)
}
min, max := ss.MinMax()
require_True(t, min == 2 && max == 2222)
// Multi-node
ss.Empty()
num := 22*numEntries + 22
nums := make([]uint64, 0, num)
for i := 0; i < num; i++ {
nums = append(nums, uint64(i))
}
rand.Shuffle(len(nums), func(i, j int) { nums[i], nums[j] = nums[j], nums[i] })
for _, n := range nums {
ss.Insert(n)
}
min, max = ss.MinMax()
require_True(t, min == 0 && max == uint64(num-1))
}
func TestSeqSetClone(t *testing.T) {
// Generate 100k sequences across 500k range.
num := 100_000
max := 500_000
var ss SequenceSet
for i := 0; i < num; i++ {
ss.Insert(uint64(rand.Int63n(int64(max + 1))))
}
ssc := ss.Clone()
require_True(t, ss.Size() == ssc.Size())
require_True(t, ss.Nodes() == ssc.Nodes())
}
func TestSeqSetUnion(t *testing.T) {
var ss1, ss2 SequenceSet
seqs1 := []uint64{22, 222, 2222, 2, 2, 4}
for _, seq := range seqs1 {
ss1.Insert(seq)
}
seqs2 := []uint64{33, 333, 3333, 3, 33_333, 333_333}
for _, seq := range seqs2 {
ss2.Insert(seq)
}
ss := Union(&ss1, &ss2)
require_True(t, ss.Size() == 11)
seqs := append(seqs1, seqs2...)
for _, n := range seqs {
require_True(t, ss.Exists(n))
}
}
func require_NoError(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatalf("require no error, but got: %v", err)
}
}
func require_True(t *testing.T, b bool) {
t.Helper()
if !b {
t.Fatalf("require true")
}
}

View File

@@ -48,6 +48,7 @@ import (
"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
"github.com/nats-io/nuid"
@@ -6214,6 +6215,93 @@ func TestNoRaceFileStoreStreamMaxAgePerformance(t *testing.T) {
fmt.Printf("%.0f msgs/sec\n", float64(num)/elapsed.Seconds())
}
// SequenceSet memory tests vs dmaps.
func TestNoRaceSeqSetSizeComparison(t *testing.T) {
// Create 5M random entries (dupes possible but ok for this test) out of 8M range.
num := 5_000_000
max := 7_000_000
seqs := make([]uint64, 0, num)
for i := 0; i < num; i++ {
n := uint64(rand.Int63n(int64(max + 1)))
seqs = append(seqs, n)
}
runtime.GC()
// Disable to get stable results.
gcp := debug.SetGCPercent(-1)
defer debug.SetGCPercent(gcp)
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
inUseBefore := mem.HeapInuse
dmap := make(map[uint64]struct{}, num)
for _, n := range seqs {
dmap[n] = struct{}{}
}
runtime.ReadMemStats(&mem)
dmapUse := mem.HeapInuse - inUseBefore
inUseBefore = mem.HeapInuse
// Now do SequenceSet on same dataset.
var sset avl.SequenceSet
for _, n := range seqs {
sset.Insert(n)
}
runtime.ReadMemStats(&mem)
seqSetUse := mem.HeapInuse - inUseBefore
if seqSetUse > 2*1024*1024 {
t.Fatalf("Expected SequenceSet size to be < 2M, got %v", friendlyBytes(int64(seqSetUse)))
}
if seqSetUse*50 > dmapUse {
t.Fatalf("Expected SequenceSet to be at least 50x better then dmap approach: %v vs %v",
friendlyBytes(int64(seqSetUse)),
friendlyBytes(int64(dmapUse)),
)
}
}
// FilteredState for ">" with large interior deletes was very slow.
func TestNoRaceFileStoreFilteredStateWithLargeDeletes(t *testing.T) {
storeDir := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: storeDir, BlockSize: 4096},
StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()
subj, msg := "foo", []byte("Hello World")
toStore := 500_000
for i := 0; i < toStore; i++ {
_, _, err := fs.StoreMsg(subj, nil, msg)
require_NoError(t, err)
}
// Now delete every other one.
for seq := 2; seq <= toStore; seq += 2 {
_, err := fs.RemoveMsg(uint64(seq))
require_NoError(t, err)
}
runtime.GC()
// Disable to get stable results.
gcp := debug.SetGCPercent(-1)
defer debug.SetGCPercent(gcp)
start := time.Now()
fss := fs.FilteredState(1, _EMPTY_)
elapsed := time.Since(start)
require_True(t, fss.Msgs == uint64(toStore/2))
require_True(t, elapsed < 500*time.Microsecond)
}
// ConsumerInfo seems to being called quite a bit more than we had anticipated.
// Under certain circumstances, since we reset num pending, this can be very costly.
// We will use the fast path to alleviate that performance bottleneck but also make