mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fix for compaction with compression and added an out of band compaction in syncBlocks to reclaim more space.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3491,14 +3491,13 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
|
||||
} else if !isEmpty {
|
||||
// Out of order delete.
|
||||
mb.dmap.Insert(seq)
|
||||
// Check if <25% utilization and minimum size met.
|
||||
if mb.rbytes > compactMinimum && !isLastBlock {
|
||||
// Remove the interior delete records
|
||||
rbytes := mb.rbytes - uint64(mb.dmap.Size()*emptyRecordLen)
|
||||
if rbytes>>2 > mb.bytes {
|
||||
mb.compact()
|
||||
fs.kickFlushStateLoop()
|
||||
}
|
||||
// Make simple check here similar to Compact(). If we can save 50% and over a certain threshold do inline.
|
||||
// All other more thorough cleanup will happen in syncBlocks logic.
|
||||
// Note that we do not have to store empty records for the deleted, so don't use to calculate.
|
||||
// TODO(dlc) - This should not be inline, should kick the sync routine.
|
||||
if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock {
|
||||
mb.compact()
|
||||
fs.kickFlushStateLoop()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3572,7 +3571,9 @@ func (mb *msgBlock) compact() {
|
||||
}
|
||||
|
||||
buf := mb.cache.buf
|
||||
nbuf := make([]byte, 0, len(buf))
|
||||
nbuf := getMsgBlockBuf(len(buf))
|
||||
// Recycle our nbuf when we are done.
|
||||
defer recycleMsgBlockBuf(nbuf)
|
||||
|
||||
var le = binary.LittleEndian
|
||||
var firstSet bool
|
||||
@@ -3622,9 +3623,16 @@ func (mb *msgBlock) compact() {
|
||||
}
|
||||
|
||||
// Handle compression
|
||||
var err error
|
||||
if nbuf, err = mb.cmp.Compress(nbuf); err != nil {
|
||||
return
|
||||
if mb.cmp != NoCompression {
|
||||
cbuf, err := mb.cmp.Compress(nbuf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
meta := &CompressionInfo{
|
||||
Algorithm: mb.cmp,
|
||||
OriginalSize: uint64(len(nbuf)),
|
||||
}
|
||||
nbuf = append(meta.MarshalMetadata(), cbuf...)
|
||||
}
|
||||
|
||||
// Check for encryption.
|
||||
@@ -4701,6 +4709,24 @@ func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (mb *msgBlock) ensureRawBytesLoaded() error {
|
||||
if mb.rbytes > 0 {
|
||||
return nil
|
||||
}
|
||||
f, err := os.Open(mb.mfn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
if fi, err := f.Stat(); fi != nil && err == nil {
|
||||
mb.rbytes = uint64(fi.Size())
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sync msg and index files as needed. This is called from a timer.
|
||||
func (fs *fileStore) syncBlocks() {
|
||||
fs.mu.RLock()
|
||||
@@ -4709,8 +4735,10 @@ func (fs *fileStore) syncBlocks() {
|
||||
return
|
||||
}
|
||||
blks := append([]*msgBlock(nil), fs.blks...)
|
||||
lmb := fs.lmb
|
||||
fs.mu.RUnlock()
|
||||
|
||||
var shouldWriteState bool
|
||||
for _, mb := range blks {
|
||||
// Do actual sync. Hold lock for consistency.
|
||||
mb.mu.Lock()
|
||||
@@ -4722,24 +4750,33 @@ func (fs *fileStore) syncBlocks() {
|
||||
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
|
||||
mb.dirtyCloseWithRemove(false)
|
||||
}
|
||||
// Check if we should compact here as well.
|
||||
// Do not compact last mb.
|
||||
if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes {
|
||||
mb.compact()
|
||||
shouldWriteState = true
|
||||
}
|
||||
|
||||
// Check if we need to sync. We will not hold lock during actual sync.
|
||||
var fn string
|
||||
if mb.needSync {
|
||||
needSync := mb.needSync
|
||||
if needSync {
|
||||
// Flush anything that may be pending.
|
||||
if mb.pendingWriteSizeLocked() > 0 {
|
||||
mb.flushPendingMsgsLocked()
|
||||
}
|
||||
fn = mb.mfn
|
||||
mb.needSync = false
|
||||
mb.flushPendingMsgsLocked()
|
||||
}
|
||||
mb.mu.Unlock()
|
||||
|
||||
// Check if we need to sync.
|
||||
// This is done not holding any locks.
|
||||
if fn != _EMPTY_ {
|
||||
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
|
||||
fd.Sync()
|
||||
if needSync {
|
||||
if fd, _ := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms); fd != nil {
|
||||
canClear := fd.Sync() == nil
|
||||
fd.Close()
|
||||
// Only clear sync flag on success.
|
||||
if canClear {
|
||||
mb.mu.Lock()
|
||||
mb.needSync = false
|
||||
mb.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4750,6 +4787,11 @@ func (fs *fileStore) syncBlocks() {
|
||||
syncAlways := fs.fcfg.SyncAlways
|
||||
fs.mu.Unlock()
|
||||
|
||||
// Check if we should write out our state due to compaction of one or more msg blocks.
|
||||
if shouldWriteState {
|
||||
fs.writeFullState()
|
||||
}
|
||||
// Sync state file if we are not running with sync always.
|
||||
if !syncAlways {
|
||||
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
|
||||
fd.Sync()
|
||||
|
||||
Reference in New Issue
Block a user