Basic swap out of the old dmap (map[uint64]struct{}) for new avl.SequenceSet.

No other optimizations yet.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-01-21 09:03:11 -08:00
parent 1f6aa94405
commit 9b9159ab70
2 changed files with 53 additions and 81 deletions

View File

@@ -40,6 +40,7 @@ import (
"github.com/klauspost/compress/s2"
"github.com/minio/highwayhash"
"github.com/nats-io/nats-server/v2/server/avl"
"golang.org/x/crypto/chacha20"
"golang.org/x/crypto/chacha20poly1305"
)
@@ -185,7 +186,7 @@ type msgBlock struct {
cexp time.Duration
ctmr *time.Timer
werr error
dmap map[uint64]struct{}
dmap avl.SequenceSet
fch chan struct{}
qch chan struct{}
lchk [8]byte
@@ -995,13 +996,13 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
// We need to declare lost data here.
ld = &LostStreamData{Msgs: make([]uint64, 0, mb.msgs), Bytes: mb.bytes}
for seq := mb.first.seq; seq <= mb.last.seq; seq++ {
if _, ok := mb.dmap[seq]; !ok {
if !mb.dmap.Exists(seq) {
ld.Msgs = append(ld.Msgs, seq)
}
}
// Clear invalid state. We will let this blk be added in here.
mb.msgs, mb.bytes, mb.rbytes, mb.fss = 0, 0, 0, nil
mb.dmap = nil
mb.dmap.Empty()
mb.first.seq = mb.last.seq + 1
}
return ld, err
@@ -1033,10 +1034,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
if seq == 0 {
return
}
if mb.dmap == nil {
mb.dmap = make(map[uint64]struct{})
}
mb.dmap[seq] = struct{}{}
mb.dmap.Insert(seq)
}
var le = binary.LittleEndian
@@ -1121,10 +1119,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}
var deleted bool
if mb.dmap != nil {
_, deleted = mb.dmap[seq]
}
deleted := mb.dmap.Exists(seq)
// Always set last.
mb.last.seq = seq
@@ -1361,11 +1356,8 @@ func (fs *fileStore) expireMsgsOnRecover() {
// Process interior deleted msgs.
if err == errDeletedMsg {
// Update dmap.
if len(mb.dmap) > 0 {
delete(mb.dmap, seq)
if len(mb.dmap) == 0 {
mb.dmap = nil
}
if mb.dmap.Exists(seq) {
mb.dmap.Delete(seq)
}
// Keep this updated just in case since we are removing dmap entries.
mb.first.seq, needNextFirst = seq, true
@@ -2447,10 +2439,7 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
}
} else {
needsRecord = true
if mb.dmap == nil {
mb.dmap = make(map[uint64]struct{})
}
mb.dmap[seq] = struct{}{}
mb.dmap.Insert(seq)
}
mb.mu.Unlock()
@@ -2732,12 +2721,10 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
}
// Now check dmap if it is there.
if mb.dmap != nil {
if _, ok := mb.dmap[seq]; ok {
mb.mu.Unlock()
fsUnlock()
return false, nil
}
if mb.dmap.Exists(seq) {
mb.mu.Unlock()
fsUnlock()
return false, nil
}
// We used to not have to load in the messages except with callbacks or the filtered subject state (which is now always on).
@@ -2803,14 +2790,11 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
}
} else if !isEmpty {
// Out of order delete.
if mb.dmap == nil {
mb.dmap = make(map[uint64]struct{})
}
mb.dmap[seq] = struct{}{}
mb.dmap.Insert(seq)
// Check if <25% utilization and minimum size met.
if mb.rbytes > compactMinimum && !isLastBlock {
// Remove the interior delete records
rbytes := mb.rbytes - uint64(len(mb.dmap)*emptyRecordLen)
rbytes := mb.rbytes - uint64(mb.dmap.Size()*emptyRecordLen)
if rbytes>>2 > mb.bytes {
mb.compact()
}
@@ -2922,11 +2906,7 @@ func (mb *msgBlock) compact() {
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
return true
}
var deleted bool
if mb.dmap != nil {
_, deleted = mb.dmap[seq]
}
return deleted
return mb.dmap.Exists(seq)
}
// For skip msgs.
@@ -3009,9 +2989,9 @@ func (mb *msgBlock) compact() {
}
}
// Nil out our dmap.
// Empty out our dmap.
func (mb *msgBlock) deleteDmap() {
mb.dmap = nil
mb.dmap.Empty()
}
// Grab info from a slot.
@@ -3104,10 +3084,10 @@ func (mb *msgBlock) flushLoop(fch, qch chan struct{}) {
mb.mu.RLock()
defer mb.mu.RUnlock()
var changed bool
if firstSeq != mb.first.seq || lastSeq != mb.last.seq || dmapLen != len(mb.dmap) {
if firstSeq != mb.first.seq || lastSeq != mb.last.seq || dmapLen != mb.dmap.Size() {
changed = true
firstSeq, lastSeq = mb.first.seq, mb.last.seq
dmapLen = len(mb.dmap)
dmapLen = mb.dmap.Size()
}
return changed
}
@@ -3228,18 +3208,15 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) {
mb.mu.Lock()
checkDmap := len(mb.dmap) > 0
checkDmap := mb.dmap.Size() > 0
var smv StoreMsg
for seq := mb.last.seq; seq > sm.seq; seq-- {
if checkDmap {
if _, ok := mb.dmap[seq]; ok {
if mb.dmap.Exists(seq) {
// Delete and skip to next.
delete(mb.dmap, seq)
if len(mb.dmap) == 0 {
mb.dmap = nil
checkDmap = false
}
mb.dmap.Delete(seq)
checkDmap = !mb.dmap.IsEmpty()
continue
}
}
@@ -3346,9 +3323,9 @@ func (mb *msgBlock) isEmpty() bool {
func (mb *msgBlock) selectNextFirst() {
var seq uint64
for seq = mb.first.seq + 1; seq <= mb.last.seq; seq++ {
if _, ok := mb.dmap[seq]; ok {
if mb.dmap.Exists(seq) {
// We will move past this so we can delete the entry.
delete(mb.dmap, seq)
mb.dmap.Delete(seq)
} else {
break
}
@@ -4379,7 +4356,7 @@ func (mb *msgBlock) cacheAlreadyLoaded() bool {
if mb.cache == nil || mb.cache.off != 0 || mb.cache.fseq == 0 || len(mb.cache.buf) == 0 {
return false
}
numEntries := mb.msgs + uint64(len(mb.dmap)) + (mb.first.seq - mb.cache.fseq)
numEntries := mb.msgs + uint64(mb.dmap.Size()) + (mb.first.seq - mb.cache.fseq)
return numEntries == uint64(len(mb.cache.idx))
}
@@ -4566,8 +4543,8 @@ func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) {
}
// If we have a delete map check it.
if mb.dmap != nil {
if _, ok := mb.dmap[seq]; ok {
if !mb.dmap.IsEmpty() {
if mb.dmap.Exists(seq) {
return nil, errDeletedMsg
}
}
@@ -4958,13 +4935,15 @@ func (fs *fileStore) State() StreamState {
}
}
cur = mb.last.seq + 1 // Expected next first.
for seq := range mb.dmap {
mb.dmap.Range(func(seq uint64) bool {
if seq < fseq {
delete(mb.dmap, seq)
mb.dmap.Delete(seq)
} else {
state.Deleted = append(state.Deleted, seq)
}
}
return true
})
mb.mu.Unlock()
}
}
@@ -5058,11 +5037,11 @@ func (mb *msgBlock) writeIndexInfoLocked() error {
n += binary.PutVarint(hdr[n:], mb.first.ts)
n += binary.PutUvarint(hdr[n:], mb.last.seq)
n += binary.PutVarint(hdr[n:], mb.last.ts)
n += binary.PutUvarint(hdr[n:], uint64(len(mb.dmap)))
n += binary.PutUvarint(hdr[n:], uint64(mb.dmap.Size()))
buf := append(hdr[:n], mb.lchk[:]...)
// Append a delete map if needed
if len(mb.dmap) > 0 {
if !mb.dmap.IsEmpty() {
buf = append(buf, mb.genDeleteMap()...)
}
@@ -5086,7 +5065,7 @@ func (mb *msgBlock) writeIndexInfoLocked() error {
// Check if this will be a short write, and if so truncate before writing here.
// We only really need to truncate if we are encryptyed or we have dmap entries.
// If no dmap entries readIndexInfo does the right thing in the presence of extra data left over.
if int64(len(buf)) < mb.liwsz && (mb.aek != nil || len(mb.dmap) > 0) {
if int64(len(buf)) < mb.liwsz && (mb.aek != nil || !mb.dmap.IsEmpty()) {
if err := mb.ifd.Truncate(0); err != nil {
mb.werr = err
return err
@@ -5183,13 +5162,12 @@ func (mb *msgBlock) readIndexInfo() error {
// Now check for presence of a delete map
if dmapLen > 0 {
mb.dmap = make(map[uint64]struct{}, dmapLen)
for i := 0; i < int(dmapLen); i++ {
seq := readSeq()
if seq == 0 {
break
}
mb.dmap[seq+mb.first.seq] = struct{}{}
mb.dmap.Insert(seq + mb.first.seq)
}
}
@@ -5197,20 +5175,22 @@ func (mb *msgBlock) readIndexInfo() error {
}
func (mb *msgBlock) genDeleteMap() []byte {
if len(mb.dmap) == 0 {
if mb.dmap.IsEmpty() {
return nil
}
buf := make([]byte, len(mb.dmap)*binary.MaxVarintLen64)
buf := make([]byte, mb.dmap.Size()*binary.MaxVarintLen64)
// We use first seq as an offset to cut down on size.
fseq, n := uint64(mb.first.seq), 0
for seq := range mb.dmap {
mb.dmap.Range(func(seq uint64) bool {
// This is for lazy cleanup as the first sequence moves up.
if seq < fseq {
delete(mb.dmap, seq)
mb.dmap.Delete(seq)
} else {
n += binary.PutUvarint(buf[n:], seq-fseq)
}
}
return true
})
return buf[:n]
}
@@ -5256,7 +5236,7 @@ func (fs *fileStore) dmapEntries() int {
var total int
fs.mu.RLock()
for _, mb := range fs.blks {
total += len(mb.dmap)
total += mb.dmap.Size()
}
fs.mu.RUnlock()
return total
@@ -5378,10 +5358,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}
} else {
// Out of order delete.
if mb.dmap == nil {
mb.dmap = make(map[uint64]struct{})
}
mb.dmap[seq] = struct{}{}
mb.dmap.Insert(seq)
}
if maxp > 0 && purged >= maxp {
@@ -5554,11 +5531,8 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
sm, err := smb.cacheLookup(mseq, &smv)
if err == errDeletedMsg {
// Update dmap.
if len(smb.dmap) > 0 {
delete(smb.dmap, mseq)
if len(smb.dmap) == 0 {
smb.dmap = nil
}
if !smb.dmap.IsEmpty() {
smb.dmap.Delete(seq)
}
} else if sm != nil {
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
@@ -6015,10 +5989,8 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
if seq&ebit != 0 {
continue
}
if len(mb.dmap) > 0 {
if _, ok := mb.dmap[seq]; ok {
continue
}
if mb.dmap.Exists(seq) {
continue
}
ss.First = seq
mb.fssNeedsWrite = true // Mark dirty

View File

@@ -3256,7 +3256,7 @@ func TestFileStoreExpireMsgsOnStart(t *testing.T) {
} else if mb.last != mbc.last {
errStr = fmt.Sprintf("last state does not match: %d vs %d", mb.last, mbc.last)
} else if !reflect.DeepEqual(mb.dmap, mbc.dmap) {
errStr = fmt.Sprintf("deleted map does not match: %d vs %d", mb.dmap, mbc.dmap)
errStr = fmt.Sprintf("deleted map does not match: %+v vs %+v", mb.dmap, mbc.dmap)
}
mb.mu.RUnlock()
if errStr != _EMPTY_ {
@@ -3993,7 +3993,7 @@ func TestFileStoreRebuildStateDmapAccountingBug(t *testing.T) {
t.Helper()
mb.mu.RLock()
defer mb.mu.RUnlock()
dmapLen := uint64(len(mb.dmap))
dmapLen := uint64(mb.dmap.Size())
if mb.msgs != (mb.last.seq-mb.first.seq+1)-dmapLen {
t.Fatalf("Consistency check failed: %d != %d -> last %d first %d len(dmap) %d",
mb.msgs, (mb.last.seq-mb.first.seq+1)-dmapLen, mb.last.seq, mb.first.seq, dmapLen)