mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
We were trying to be too smart to save space at the expense of encoding time for filestore.
Revert back to very simple but way faster method. Sometimes 100x faster and only ~8% size increase. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -6887,8 +6887,21 @@ func (fs *fileStore) fileStoreConfig() FileStoreConfig {
|
||||
return fs.fcfg
|
||||
}
|
||||
|
||||
// When we will write a run length encoded record vs adding to the existing avl.SequenceSet.
|
||||
const rlThresh = 4096
|
||||
// Read lock all existing message blocks.
|
||||
// Lock held on entry.
|
||||
func (fs *fileStore) readLockAllMsgBlocks() {
|
||||
for _, mb := range fs.blks {
|
||||
mb.mu.RLock()
|
||||
}
|
||||
}
|
||||
|
||||
// Read unlock all existing message blocks.
|
||||
// Lock held on entry.
|
||||
func (fs *fileStore) readUnlockAllMsgBlocks() {
|
||||
for _, mb := range fs.blks {
|
||||
mb.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Binary encoded state snapshot, >= v2.10 server.
|
||||
func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) {
|
||||
@@ -6919,6 +6932,10 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) {
|
||||
|
||||
if numDeleted > 0 {
|
||||
var scratch [4 * 1024]byte
|
||||
|
||||
fs.readLockAllMsgBlocks()
|
||||
defer fs.readUnlockAllMsgBlocks()
|
||||
|
||||
for _, db := range fs.deleteBlocks() {
|
||||
switch db := db.(type) {
|
||||
case *DeleteRange:
|
||||
@@ -6943,68 +6960,23 @@ func (fs *fileStore) EncodedStreamState(failed uint64) ([]byte, error) {
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
// We used to be more sophisticated to save memory, but speed is more important.
|
||||
// All blocks should be at least read locked.
|
||||
func (fs *fileStore) deleteBlocks() DeleteBlocks {
|
||||
var (
|
||||
dbs DeleteBlocks
|
||||
adm *avl.SequenceSet
|
||||
prevLast uint64
|
||||
)
|
||||
var dbs DeleteBlocks
|
||||
var prevLast uint64
|
||||
|
||||
for _, mb := range fs.blks {
|
||||
mb.mu.RLock()
|
||||
// Detect if we have a gap between these blocks.
|
||||
if prevLast > 0 && prevLast+1 != mb.first.seq {
|
||||
// Detect if we need to encode a run length encoding here.
|
||||
if gap := mb.first.seq - prevLast - 1; gap > rlThresh {
|
||||
// Check if we have a running adm, if so write that out first, or if contigous update rle params.
|
||||
if min, max, num := adm.State(); num > 0 {
|
||||
// Check if we are all contingous.
|
||||
if num == max-min+1 {
|
||||
prevLast, gap = min-1, mb.first.seq-min
|
||||
} else {
|
||||
dbs = append(dbs, adm)
|
||||
}
|
||||
// Always nil out here.
|
||||
adm = nil
|
||||
}
|
||||
dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap})
|
||||
} else {
|
||||
// Common dmap
|
||||
if adm == nil {
|
||||
adm = &avl.SequenceSet{}
|
||||
adm.SetInitialMin(prevLast + 1)
|
||||
}
|
||||
for seq := prevLast + 1; seq < mb.first.seq; seq++ {
|
||||
adm.Insert(seq)
|
||||
}
|
||||
}
|
||||
gap := mb.first.seq - prevLast - 1
|
||||
dbs = append(dbs, &DeleteRange{First: prevLast + 1, Num: gap})
|
||||
}
|
||||
if min, max, num := mb.dmap.State(); num > 0 {
|
||||
// Check in case the mb's dmap is contiguous and over our threshold.
|
||||
if num == max-min+1 && num > rlThresh {
|
||||
// Need to write out adm if it exists.
|
||||
if adm != nil && adm.Size() > 0 {
|
||||
dbs = append(dbs, adm)
|
||||
adm = nil
|
||||
}
|
||||
dbs = append(dbs, &DeleteRange{First: min, Num: max - min + 1})
|
||||
} else {
|
||||
// Aggregated dmap
|
||||
if adm == nil {
|
||||
adm = mb.dmap.Clone()
|
||||
} else {
|
||||
adm.Union(&mb.dmap)
|
||||
}
|
||||
}
|
||||
if mb.dmap.Size() > 0 {
|
||||
dbs = append(dbs, &mb.dmap)
|
||||
}
|
||||
prevLast = mb.last.seq
|
||||
mb.mu.RUnlock()
|
||||
}
|
||||
|
||||
if adm != nil {
|
||||
dbs = append(dbs, adm)
|
||||
}
|
||||
|
||||
return dbs
|
||||
}
|
||||
|
||||
@@ -7013,9 +6985,13 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
|
||||
if len(dbs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
|
||||
var needsCheck DeleteBlocks
|
||||
|
||||
fs.readLockAllMsgBlocks()
|
||||
mdbs := fs.deleteBlocks()
|
||||
for i, db := range dbs {
|
||||
// If the block is same as what we have we can skip.
|
||||
@@ -7027,6 +7003,11 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
|
||||
}
|
||||
}
|
||||
// Need to insert these.
|
||||
needsCheck = append(needsCheck, db)
|
||||
}
|
||||
fs.readUnlockAllMsgBlocks()
|
||||
|
||||
for _, db := range needsCheck {
|
||||
db.Range(func(dseq uint64) bool {
|
||||
fs.removeMsg(dseq, false, true, false)
|
||||
return true
|
||||
|
||||
Reference in New Issue
Block a user