diff --git a/server/avl/norace_test.go b/server/avl/norace_test.go new file mode 100644 index 00000000..7c18fb70 --- /dev/null +++ b/server/avl/norace_test.go @@ -0,0 +1,199 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !race && !skip_no_race_tests +// +build !race,!skip_no_race_tests + +package avl + +import ( + "flag" + "fmt" + "math" + "math/rand" + "runtime" + "runtime/debug" + "testing" + "time" +) + +// Print Results: go test -v --args --results +var printResults = flag.Bool("results", false, "Enable Results Logging") + +// SequenceSet memory tests vs dmaps. +func TestNoRaceSeqSetSizeComparison(t *testing.T) { + // Create 5M random entries (dupes possible but ok for this test) out of 8M range. + num := 5_000_000 + max := 7_000_000 + + seqs := make([]uint64, 0, num) + for i := 0; i < num; i++ { + n := uint64(rand.Int63n(int64(max + 1))) + seqs = append(seqs, n) + } + + runtime.GC() + // Disable to get stable results. + gcp := debug.SetGCPercent(-1) + defer debug.SetGCPercent(gcp) + + mem := runtime.MemStats{} + runtime.ReadMemStats(&mem) + inUseBefore := mem.HeapInuse + + dmap := make(map[uint64]struct{}, num) + for _, n := range seqs { + dmap[n] = struct{}{} + } + runtime.ReadMemStats(&mem) + dmapUse := mem.HeapInuse - inUseBefore + inUseBefore = mem.HeapInuse + + // Now do SequenceSet on same dataset. + var sset SequenceSet + for _, n := range seqs { + sset.Insert(n) + } + + runtime.ReadMemStats(&mem) + seqSetUse := mem.HeapInuse - inUseBefore + + if seqSetUse > 2*1024*1024 { + t.Fatalf("Expected SequenceSet size to be < 2M, got %v", friendlyBytes(seqSetUse)) + } + if seqSetUse*50 > dmapUse { + t.Fatalf("Expected SequenceSet to be at least 50x better then dmap approach: %v vs %v", + friendlyBytes(seqSetUse), + friendlyBytes(dmapUse), + ) + } +} + +func TestNoRaceSeqSetEncodeLarge(t *testing.T) { + num := 2_500_000 + max := 5_000_000 + + dmap := make(map[uint64]struct{}, num) + var ss SequenceSet + for i := 0; i < num; i++ { + n := uint64(rand.Int63n(int64(max + 1))) + ss.Insert(n) + dmap[n] = struct{}{} + } + + // Disable to get stable results. + gcp := debug.SetGCPercent(-1) + defer debug.SetGCPercent(gcp) + + // In general should be about the same, but can see some variability. + expected := 500 * time.Microsecond + + start := time.Now() + b, err := ss.Encode(nil) + require_NoError(t, err) + + if elapsed := time.Since(start); elapsed > expected { + t.Fatalf("Expected encode of %d items with encoded size %v to take less than %v, got %v", + num, friendlyBytes(len(b)), expected, elapsed) + } else { + logResults("Encode time for %d items was %v, encoded size is %v\n", num, elapsed, friendlyBytes(len(b))) + } + + start = time.Now() + ss2, err := Decode(b) + require_NoError(t, err) + if elapsed := time.Since(start); elapsed > expected { + t.Fatalf("Expected decode to take less than %v, got %v", expected, elapsed) + } else { + logResults("Decode time is %v\n", elapsed) + } + require_True(t, ss.Nodes() == ss2.Nodes()) +} + +func TestNoRaceSeqSetRelativeSpeed(t *testing.T) { + // Create 1M random entries (dupes possible but ok for this test) out of 3M range. + num := 1_000_000 + max := 3_000_000 + + seqs := make([]uint64, 0, num) + for i := 0; i < num; i++ { + n := uint64(rand.Int63n(int64(max + 1))) + seqs = append(seqs, n) + } + + start := time.Now() + // Now do SequenceSet on same dataset. + var sset SequenceSet + for _, n := range seqs { + sset.Insert(n) + } + ssInsertElapsed := time.Since(start) + logResults("Inserts SequenceSet: %v for %d items\n", ssInsertElapsed, num) + + start = time.Now() + for _, n := range seqs { + if ok := sset.Exists(n); !ok { + t.Fatalf("Should exist") + } + } + ssLookupElapsed := time.Since(start) + logResults("Lookups: %v\n", ssLookupElapsed) + + // Now do a map. + dmap := make(map[uint64]struct{}) + start = time.Now() + for _, n := range seqs { + dmap[n] = struct{}{} + } + mapInsertElapsed := time.Since(start) + logResults("Inserts Map[uint64]: %v for %d items\n", mapInsertElapsed, num) + + start = time.Now() + for _, n := range seqs { + if _, ok := dmap[n]; !ok { + t.Fatalf("Should exist") + } + } + mapLookupElapsed := time.Since(start) + logResults("Lookups: %v\n", mapLookupElapsed) + + // In general we are between 1.5 and 1.75 times slower atm then a straight map. + // Let's test an upper bound of 2x for now. + if mapInsertElapsed*2 <= ssInsertElapsed { + t.Fatalf("Expected SequenceSet insert to be no more than 2x slower (%v vs %v)", mapInsertElapsed, ssInsertElapsed) + } + + if mapLookupElapsed*2 <= ssLookupElapsed { + t.Fatalf("Expected SequenceSet lookups to be no more than 2x slower (%v vs %v)", mapLookupElapsed, ssLookupElapsed) + } +} + +// friendlyBytes returns a string with the given bytes int64 +// represented as a size, such as 1KB, 10MB, etc... +func friendlyBytes[T int | uint64 | int64](bytes T) string { + fbytes := float64(bytes) + base := 1024 + pre := []string{"K", "M", "G", "T", "P", "E"} + if fbytes < float64(base) { + return fmt.Sprintf("%v B", fbytes) + } + exp := int(math.Log(fbytes) / math.Log(float64(base))) + index := exp - 1 + return fmt.Sprintf("%.2f %sB", fbytes/math.Pow(float64(base), float64(exp)), pre[index]) +} + +func logResults(format string, args ...interface{}) { + if *printResults { + fmt.Printf(format, args...) + } +} diff --git a/server/avl/seqset.go b/server/avl/seqset.go new file mode 100644 index 00000000..c0187706 --- /dev/null +++ b/server/avl/seqset.go @@ -0,0 +1,544 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avl + +import ( + "encoding/binary" + "errors" + "math/bits" + "sort" +) + +// SequenceSet is a memory and encoding optimized set for storing unsigned ints. +// +// SequenceSet is ~80-100 times more efficient memory wise than a map[uint64]struct{}. +// SequenceSet is ~1.75 times slower at inserts than the same map. +// SequenceSet is not thread safe. +// +// We use an AVL tree with nodes that hold bitmasks for set membership. +// +// Encoding will convert to a space optimized encoding using bitmasks. +type SequenceSet struct { + root *node // root node + size int // number of items + nodes int // number of nodes + // Having this here vs on the stack in Insert/Delete + // makes a difference in memory usage. + changed bool +} + +// Insert will insert the sequence into the set. +// The tree will be balanced inline. +func (ss *SequenceSet) Insert(seq uint64) { + if ss.root = ss.root.insert(seq, &ss.changed, &ss.nodes); ss.changed { + ss.changed = false + ss.size++ + } +} + +// Exists will return true iff the sequence is a member of this set. +func (ss *SequenceSet) Exists(seq uint64) bool { + for n := ss.root; n != nil; { + if seq < n.base { + n = n.l + continue + } else if seq >= n.base+numEntries { + n = n.r + continue + } + return n.exists(seq) + } + return false +} + +// Delete will remove the sequence from the set. +// Wil optionally remove nodes and rebalance. +func (ss *SequenceSet) Delete(seq uint64) { + if ss == nil || ss.root == nil { + return + } + ss.root = ss.root.delete(seq, &ss.changed, &ss.nodes) + if ss.changed { + ss.changed = false + ss.size-- + } +} + +// Size returns the number of items in the set. +func (ss *SequenceSet) Size() int { + return ss.size +} + +// Nodes returns the number of nodes in the tree. +func (ss *SequenceSet) Nodes() int { + return ss.nodes +} + +// Empty will clear all items from a set. +func (ss *SequenceSet) Empty() { + ss.root = nil + ss.size = 0 + ss.nodes = 0 +} + +// IsEmpty is a fast check of the set being empty. +func (ss *SequenceSet) IsEmpty() bool { + if ss == nil || ss.root == nil { + return true + } + return false +} + +// Range will invoke the given function for each item in the set. +// They will range over the set in ascending order. +// If the callback returns false we terminate the iteration. +func (ss *SequenceSet) Range(f func(uint64) bool) { + ss.root.iter(f) +} + +// Heights returns the left and right heights of the tree. +func (ss *SequenceSet) Heights() (l, r int) { + if ss.root == nil { + return 0, 0 + } + if ss.root.l != nil { + l = ss.root.l.h + } + if ss.root.r != nil { + r = ss.root.r.h + } + return l, r +} + +// MinMax will return the minunum and maximum values in the set. +func (ss *SequenceSet) MinMax() (min, max uint64) { + if ss.root == nil { + return 0, 0 + } + for l := ss.root; l != nil; l = l.l { + if l.l == nil { + min = l.min() + } + } + for r := ss.root; r != nil; r = r.r { + if r.r == nil { + max = r.max() + } + } + return min, max +} + +func clone(src *node, target **node) { + if src == nil { + return + } + n := &node{base: src.base, bits: src.bits, h: src.h} + *target = n + clone(src.l, &n.l) + clone(src.r, &n.r) +} + +// Clone will return a clone of the given SequenceSet. +func (ss *SequenceSet) Clone() *SequenceSet { + if ss == nil { + return nil + } + css := &SequenceSet{nodes: ss.nodes, size: ss.size} + clone(ss.root, &css.root) + + return css +} + +// Union will return a union of all sets. +func Union(ssa ...*SequenceSet) *SequenceSet { + if len(ssa) == 0 { + return nil + } + // Sort so we can clone largest. + sort.Slice(ssa, func(i, j int) bool { return ssa[i].Size() > ssa[j].Size() }) + ss := ssa[0].Clone() + + // Insert the rest through range call. + for i := 1; i < len(ssa); i++ { + ssa[i].Range(func(n uint64) bool { + ss.Insert(n) + return true + }) + } + return ss +} + +const ( + // Magic is used to identify the encode binary state.. + magic = uint8(22) + // Version + version = uint8(1) + // hdrLen + hdrLen = 2 + // minimum length of an encoded SequenceSet. + minLen = 2 + 4 // magic + version + num nodes. +) + +// EncodeLen returns the bytes needed for encoding. +func (ss SequenceSet) EncodeLen() int { + return minLen + (ss.Nodes() * ((numBuckets+1)*8 + 2)) +} + +func (ss SequenceSet) Encode(buf []byte) ([]byte, error) { + nn, encLen := ss.Nodes(), ss.EncodeLen() + + if len(buf) < encLen { + buf = make([]byte, encLen) + } + + // TODO(dlc) - Go 1.19 introduced Append to not have to keep track. + // Once 1.20 is out we could change this over. + // Also binary.Write() is way slower, do not use. + + var le = binary.LittleEndian + buf[0], buf[1] = magic, version + i := hdrLen + le.PutUint32(buf[i:], uint32(nn)) + i += 4 + ss.root.nodeIter(func(n *node) { + le.PutUint64(buf[i:], n.base) + i += 8 + for _, b := range n.bits { + le.PutUint64(buf[i:], b) + i += 8 + } + le.PutUint16(buf[i:], uint16(n.h)) + i += 2 + }) + return buf[:i], nil +} + +// ErrBadEncoding is returned when we can not decode properly. +var ErrBadEncoding = errors.New("ss: bad encoding") + +func Decode(buf []byte) (*SequenceSet, error) { + if len(buf) < minLen || buf[0] != magic || buf[1] != version { + return nil, ErrBadEncoding + } + + var le = binary.LittleEndian + index := 2 + nn := int(le.Uint32(buf[index:])) + index += 4 + + expectedLen := minLen + (nn * ((numBuckets+1)*8 + 2)) + if len(buf) != expectedLen { + return nil, ErrBadEncoding + } + + nodes := make([]node, nn) + + var ss SequenceSet + for i := 0; i < nn; i++ { + n := &nodes[i] + n.base = le.Uint64(buf[index:]) + index += 8 + for bi := range n.bits { + n.bits[bi] = le.Uint64(buf[index:]) + index += 8 + } + n.h = int(le.Uint16(buf[index:])) + index += 2 + ss.insertNode(n) + } + + return &ss, nil +} + +// insertNode places a decoded node into the tree. +// These should be done in tree order as defined by Encode() +// This allows us to not have to calculate height or do rebalancing. +// So much better performance this way. +func (ss *SequenceSet) insertNode(n *node) { + ss.nodes++ + + if ss.root == nil { + ss.root = n + return + } + // Walk our way to the insertion point. + for p := ss.root; p != nil; { + if n.base < p.base { + if p.l == nil { + p.l = n + return + } + p = p.l + } else { + if p.r == nil { + p.r = n + return + } + p = p.r + } + } +} + +const ( + bitsPerBucket = 64 // bits in uint64 + numBuckets = 64 + numEntries = numBuckets * bitsPerBucket +) + +type node struct { + //v dvalue + base uint64 + bits [numBuckets]uint64 + l *node + r *node + h int +} + +// Set the proper bit. +// seq should have already been qualified and inserted should be non nil. +func (n *node) set(seq uint64, inserted *bool) { + seq -= n.base + i := seq / bitsPerBucket + mask := uint64(1) << (seq % bitsPerBucket) + if (n.bits[i] & mask) == 0 { + n.bits[i] |= mask + *inserted = true + } +} + +func (n *node) insert(seq uint64, inserted *bool, nodes *int) *node { + if n == nil { + base := (seq / numEntries) * numEntries + n := &node{base: base, h: 1} + n.set(seq, inserted) + *nodes++ + return n + } + + if seq < n.base { + n.l = n.l.insert(seq, inserted, nodes) + } else if seq >= n.base+numEntries { + n.r = n.r.insert(seq, inserted, nodes) + } else { + n.set(seq, inserted) + } + + n.h = maxH(n) + 1 + + // Don't make a function, impacts performance. + if bf := balanceF(n); bf > 1 { + // Left unbalanced. + if n.l.base+numEntries > seq { + return n.rotateR() + } else { + n.l = n.l.rotateL() + return n.rotateR() + } + } else if bf < -1 { + // right unbalanced. + if n.r.base+numEntries > seq { + n.r = n.r.rotateR() + return n.rotateL() + } else { + return n.rotateL() + } + } + return n +} + +func (n *node) rotateL() *node { + r := n.r + if r != nil { + n.r = r.l + r.l = n + n.h = maxH(n) + 1 + r.h = maxH(r) + 1 + } else { + n.r = nil + n.h = maxH(n) + 1 + } + return r +} + +func (n *node) rotateR() *node { + l := n.l + if l != nil { + n.l = l.r + l.r = n + n.h = maxH(n) + 1 + l.h = maxH(l) + 1 + } else { + n.l = nil + n.h = maxH(n) + 1 + } + return l +} + +func balanceF(n *node) int { + if n == nil { + return 0 + } + var lh, rh int + if n.l != nil { + lh = n.l.h + } + if n.r != nil { + rh = n.r.h + } + return lh - rh +} + +func maxH(n *node) int { + var lh, rh int + if n.l != nil { + lh = n.l.h + } + if n.r != nil { + rh = n.r.h + } + if lh > rh { + return lh + } + return rh +} + +// Clear the proper bit. +// seq should have already been qualified and deleted should be non nil. +// Will return true if this node is now empty. +func (n *node) clear(seq uint64, deleted *bool) bool { + seq -= n.base + i := seq / bitsPerBucket + mask := uint64(1) << (seq % bitsPerBucket) + if (n.bits[i] & mask) != 0 { + n.bits[i] &^= mask + *deleted = true + } + for _, b := range n.bits { + if b != 0 { + return false + } + } + return true +} + +func (n *node) delete(seq uint64, deleted *bool, nodes *int) *node { + if n == nil { + return nil + } + + if seq < n.base { + n.l = n.l.delete(seq, deleted, nodes) + } else if seq >= n.base+numEntries { + n.r = n.r.delete(seq, deleted, nodes) + } else if empty := n.clear(seq, deleted); empty { + *nodes-- + if nn := n.l; nn == nil { + n = n.r + } else if nn.r == nil { + nn.r = n.r + n = nn + } else { + nn.r.r = n.r + n = nn + } + } + + // Check balance. + if bf := balanceF(n); bf > 1 { + // Left unbalanced. + if n.l.base+numEntries > seq { + return n.rotateR() + } else { + n.l = n.l.rotateL() + return n.rotateR() + } + } else if bf < -1 { + // right unbalanced. + if n.r.base+numEntries > seq { + n.r = n.r.rotateR() + return n.rotateL() + } else { + return n.rotateL() + } + } + + return n +} + +func (n *node) exists(seq uint64) bool { + seq -= n.base + i := seq / bitsPerBucket + mask := uint64(1) << (seq % bitsPerBucket) + return n.bits[i]&mask != 0 +} + +// Return minimum sequence in the set. +// This node can not be empty. +func (n *node) min() uint64 { + for i, b := range n.bits { + if b != 0 { + return n.base + + uint64(i*bitsPerBucket) + + uint64(bits.TrailingZeros64(b)) + } + } + return 0 +} + +// Return maximum sequence in the set. +// This node can not be empty. +func (n *node) max() uint64 { + for i := numBuckets - 1; i >= 0; i-- { + if b := n.bits[i]; b != 0 { + return n.base + + uint64(i*bitsPerBucket) + + uint64(bitsPerBucket-bits.LeadingZeros64(b>>1)) + } + } + return 0 +} + +// This is done in tree order. +func (n *node) nodeIter(f func(n *node)) { + if n == nil { + return + } + f(n) + n.l.nodeIter(f) + n.r.nodeIter(f) +} + +// iter will iterate through the set's items in this node. +// If the supplied function returns false we terminate the iteration. +func (n *node) iter(f func(uint64) bool) bool { + if n == nil { + return true + } + + if ok := n.l.iter(f); !ok { + return false + } + for num := n.base; num < n.base+numEntries; num++ { + if n.exists(num) { + if ok := f(num); !ok { + return false + } + } + } + if ok := n.r.iter(f); !ok { + return false + } + + return true +} diff --git a/server/avl/seqset_test.go b/server/avl/seqset_test.go new file mode 100644 index 00000000..8a90971d --- /dev/null +++ b/server/avl/seqset_test.go @@ -0,0 +1,233 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avl + +import ( + "math/rand" + "testing" +) + +func TestSeqSetBasics(t *testing.T) { + var ss SequenceSet + + seqs := []uint64{22, 222, 2222, 2, 2, 4} + for _, seq := range seqs { + ss.Insert(seq) + require_True(t, ss.Exists(seq)) + } + + require_True(t, ss.Nodes() == 1) + require_True(t, ss.Size() == len(seqs)-1) // We have one dup in there. + lh, rh := ss.Heights() + require_True(t, lh == 0) + require_True(t, rh == 0) +} + +func TestSeqSetLeftLean(t *testing.T) { + var ss SequenceSet + + for i := uint64(4 * numEntries); i > 0; i-- { + ss.Insert(i) + } + + require_True(t, ss.Nodes() == 5) + require_True(t, ss.Size() == 4*numEntries) + lh, rh := ss.Heights() + require_True(t, lh == 2) + require_True(t, rh == 1) +} + +func TestSeqSetRightLean(t *testing.T) { + var ss SequenceSet + + for i := uint64(0); i < uint64(4*numEntries); i++ { + ss.Insert(i) + } + + require_True(t, ss.Nodes() == 4) + require_True(t, ss.Size() == 4*numEntries) + lh, rh := ss.Heights() + require_True(t, lh == 1) + require_True(t, rh == 2) +} + +func TestSeqSetCorrectness(t *testing.T) { + // Generate 100k sequences across 500k range. + num := 100_000 + max := 500_000 + + set := make(map[uint64]struct{}, num) + var ss SequenceSet + for i := 0; i < num; i++ { + n := uint64(rand.Int63n(int64(max + 1))) + ss.Insert(n) + set[n] = struct{}{} + } + + for i := uint64(0); i <= uint64(max); i++ { + _, exists := set[i] + require_True(t, ss.Exists(i) == exists) + } +} + +func TestSeqSetRange(t *testing.T) { + num := 2*numEntries + 22 + nums := make([]uint64, 0, num) + for i := 0; i < num; i++ { + nums = append(nums, uint64(i)) + } + rand.Shuffle(len(nums), func(i, j int) { nums[i], nums[j] = nums[j], nums[i] }) + + var ss SequenceSet + for _, n := range nums { + ss.Insert(n) + } + + nums = nums[:0] + ss.Range(func(n uint64) bool { + nums = append(nums, n) + return true + }) + require_True(t, len(nums) == num) + for i := uint64(0); i < uint64(num); i++ { + require_True(t, nums[i] == i) + } + + // Test truncating the range call. + nums = nums[:0] + ss.Range(func(n uint64) bool { + if n >= 10 { + return false + } + nums = append(nums, n) + return true + }) + require_True(t, len(nums) == 10) + for i := uint64(0); i < 10; i++ { + require_True(t, nums[i] == i) + } +} + +func TestSeqSetDelete(t *testing.T) { + var ss SequenceSet + + // Simple single node. + seqs := []uint64{22, 222, 2222, 2, 2, 4} + for _, seq := range seqs { + ss.Insert(seq) + } + + for _, seq := range seqs { + ss.Delete(seq) + require_True(t, !ss.Exists(seq)) + } + require_True(t, ss.root == nil) + + num := 22*numEntries + 22 + nums := make([]uint64, 0, num) + for i := 0; i < num; i++ { + nums = append(nums, uint64(i)) + } + rand.Shuffle(len(nums), func(i, j int) { nums[i], nums[j] = nums[j], nums[i] }) + + for _, n := range nums { + ss.Insert(n) + } + + for _, n := range nums { + ss.Delete(n) + require_True(t, !ss.Exists(n)) + } + require_True(t, ss.root == nil) +} + +func TestSeqSetMinMax(t *testing.T) { + var ss SequenceSet + + // Simple single node. + seqs := []uint64{22, 222, 2222, 2, 2, 4} + for _, seq := range seqs { + ss.Insert(seq) + } + + min, max := ss.MinMax() + require_True(t, min == 2 && max == 2222) + + // Multi-node + ss.Empty() + + num := 22*numEntries + 22 + nums := make([]uint64, 0, num) + for i := 0; i < num; i++ { + nums = append(nums, uint64(i)) + } + rand.Shuffle(len(nums), func(i, j int) { nums[i], nums[j] = nums[j], nums[i] }) + for _, n := range nums { + ss.Insert(n) + } + + min, max = ss.MinMax() + require_True(t, min == 0 && max == uint64(num-1)) +} + +func TestSeqSetClone(t *testing.T) { + // Generate 100k sequences across 500k range. + num := 100_000 + max := 500_000 + + var ss SequenceSet + for i := 0; i < num; i++ { + ss.Insert(uint64(rand.Int63n(int64(max + 1)))) + } + + ssc := ss.Clone() + require_True(t, ss.Size() == ssc.Size()) + require_True(t, ss.Nodes() == ssc.Nodes()) +} + +func TestSeqSetUnion(t *testing.T) { + var ss1, ss2 SequenceSet + + seqs1 := []uint64{22, 222, 2222, 2, 2, 4} + for _, seq := range seqs1 { + ss1.Insert(seq) + } + + seqs2 := []uint64{33, 333, 3333, 3, 33_333, 333_333} + for _, seq := range seqs2 { + ss2.Insert(seq) + } + + ss := Union(&ss1, &ss2) + require_True(t, ss.Size() == 11) + + seqs := append(seqs1, seqs2...) + for _, n := range seqs { + require_True(t, ss.Exists(n)) + } +} + +func require_NoError(t *testing.T, err error) { + t.Helper() + if err != nil { + t.Fatalf("require no error, but got: %v", err) + } +} + +func require_True(t *testing.T, b bool) { + t.Helper() + if !b { + t.Fatalf("require true") + } +} diff --git a/server/norace_test.go b/server/norace_test.go index 4804ce78..e6acf5b6 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -48,6 +48,7 @@ import ( "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" + "github.com/nats-io/nats-server/v2/server/avl" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" "github.com/nats-io/nuid" @@ -6214,6 +6215,93 @@ func TestNoRaceFileStoreStreamMaxAgePerformance(t *testing.T) { fmt.Printf("%.0f msgs/sec\n", float64(num)/elapsed.Seconds()) } +// SequenceSet memory tests vs dmaps. +func TestNoRaceSeqSetSizeComparison(t *testing.T) { + // Create 5M random entries (dupes possible but ok for this test) out of 8M range. + num := 5_000_000 + max := 7_000_000 + + seqs := make([]uint64, 0, num) + for i := 0; i < num; i++ { + n := uint64(rand.Int63n(int64(max + 1))) + seqs = append(seqs, n) + } + + runtime.GC() + // Disable to get stable results. + gcp := debug.SetGCPercent(-1) + defer debug.SetGCPercent(gcp) + + mem := runtime.MemStats{} + runtime.ReadMemStats(&mem) + inUseBefore := mem.HeapInuse + + dmap := make(map[uint64]struct{}, num) + for _, n := range seqs { + dmap[n] = struct{}{} + } + runtime.ReadMemStats(&mem) + dmapUse := mem.HeapInuse - inUseBefore + inUseBefore = mem.HeapInuse + + // Now do SequenceSet on same dataset. + var sset avl.SequenceSet + for _, n := range seqs { + sset.Insert(n) + } + + runtime.ReadMemStats(&mem) + seqSetUse := mem.HeapInuse - inUseBefore + + if seqSetUse > 2*1024*1024 { + t.Fatalf("Expected SequenceSet size to be < 2M, got %v", friendlyBytes(int64(seqSetUse))) + } + if seqSetUse*50 > dmapUse { + t.Fatalf("Expected SequenceSet to be at least 50x better then dmap approach: %v vs %v", + friendlyBytes(int64(seqSetUse)), + friendlyBytes(int64(dmapUse)), + ) + } +} + +// FilteredState for ">" with large interior deletes was very slow. +func TestNoRaceFileStoreFilteredStateWithLargeDeletes(t *testing.T) { + storeDir := t.TempDir() + + fs, err := newFileStore( + FileStoreConfig{StoreDir: storeDir, BlockSize: 4096}, + StreamConfig{Name: "zzz", Subjects: []string{"foo"}, Storage: FileStorage}, + ) + require_NoError(t, err) + defer fs.Stop() + + subj, msg := "foo", []byte("Hello World") + + toStore := 500_000 + for i := 0; i < toStore; i++ { + _, _, err := fs.StoreMsg(subj, nil, msg) + require_NoError(t, err) + } + + // Now delete every other one. + for seq := 2; seq <= toStore; seq += 2 { + _, err := fs.RemoveMsg(uint64(seq)) + require_NoError(t, err) + } + + runtime.GC() + // Disable to get stable results. + gcp := debug.SetGCPercent(-1) + defer debug.SetGCPercent(gcp) + + start := time.Now() + fss := fs.FilteredState(1, _EMPTY_) + elapsed := time.Since(start) + + require_True(t, fss.Msgs == uint64(toStore/2)) + require_True(t, elapsed < 500*time.Microsecond) +} + // ConsumerInfo seems to being called quite a bit more than we had anticipated. // Under certain circumstances, since we reset num pending, this can be very costly. // We will use the fast path to alleviate that performance bottleneck but also make