Increased purge performance for very large streams

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2020-03-22 12:23:35 -07:00
parent df774e44b0
commit f2ece0515c
2 changed files with 72 additions and 12 deletions

View File

@@ -121,6 +121,8 @@ const (
streamsDir = "streams"
// This is where we keep the message store blocks.
msgDir = "msgs"
// This is where we temporarily move the messages dir.
purgeDir = "__msgs__"
// used to scan blk file names.
blkScan = "%d.blk"
// used to scan index file names.
@@ -273,6 +275,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64) uint64 {
}
// Write out meta and the checksum.
// Lock should be held.
func (fs *fileStore) writeStreamMeta() error {
meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) {
@@ -401,6 +404,12 @@ func (fs *fileStore) recoverMsgs() error {
fs.mu.Lock()
defer fs.mu.Unlock()
// Check for any left over purged messages.
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
if _, err := os.Stat(pdir); err == nil {
go os.RemoveAll(pdir)
}
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
fis, err := ioutil.ReadDir(mdir)
if err != nil {
@@ -1611,22 +1620,25 @@ func (fs *fileStore) Purge() uint64 {
fs.mu.Lock()
fs.flushPendingWrites()
purged := fs.state.Msgs
cb := fs.scb
bytes := int64(fs.state.Bytes)
fs.state.FirstSeq = fs.state.LastSeq + 1
fs.state.Bytes = 0
fs.state.Msgs = 0
fs.writeStreamMeta()
blks := fs.blks
lmb := fs.lmb
fs.blks = nil
fs.lmb = nil
for _, mb := range blks {
mb.mu.Lock()
fs.removeMsgBlock(mb)
mb.mu.Unlock()
}
// Move the msgs directory out of the way, will delete out of band.
// FIXME(dlc) - These can error and we need to change api.
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
os.Rename(mdir, pdir)
os.MkdirAll(mdir, 0755)
go os.RemoveAll(pdir)
// Now place new write msg block with correct info.
fs.newMsgBlockForWrite()
if lmb != nil {
@@ -1635,6 +1647,8 @@ func (fs *fileStore) Purge() uint64 {
fs.lmb.last = lmb.last
fs.lmb.writeIndexInfo()
}
cb := fs.scb
fs.mu.Unlock()
if cb != nil {

View File

@@ -366,7 +366,7 @@ func TestFileStorePurge(t *testing.T) {
t.Fatalf("Expected to have exactly 1 empty msg block, got %d", numBlocks)
}
checkPurgeState := func() {
checkPurgeState := func(stored uint64) {
t.Helper()
state = fs.State()
if state.Msgs != 0 {
@@ -375,14 +375,14 @@ func TestFileStorePurge(t *testing.T) {
if state.Bytes != 0 {
t.Fatalf("Expected 0 bytes after purge, got %d", state.Bytes)
}
if state.LastSeq != toStore {
if state.LastSeq != stored {
t.Fatalf("Expected LastSeq to be %d., got %d", toStore, state.LastSeq)
}
if state.FirstSeq != toStore+1 {
if state.FirstSeq != stored+1 {
t.Fatalf("Expected FirstSeq to be %d., got %d", toStore+1, state.FirstSeq)
}
}
checkPurgeState()
checkPurgeState(toStore)
// Make sure we recover same state.
fs.Stop()
@@ -397,7 +397,53 @@ func TestFileStorePurge(t *testing.T) {
t.Fatalf("Expected to have exactly 1 empty msg block, got %d", numBlocks)
}
checkPurgeState()
checkPurgeState(toStore)
// Now make sure we clean up any dangling purged messages.
for i := uint64(0); i < toStore; i++ {
fs.StoreMsg(subj, msg)
}
state = fs.State()
if state.Msgs != toStore {
t.Fatalf("Expected %d msgs, got %d", toStore, state.Msgs)
}
if state.Bytes != storedMsgSize*toStore {
t.Fatalf("Expected bytes to be %d, got %d", storedMsgSize*toStore, state.Bytes)
}
// We will simulate crashing before the purge directory is cleared.
mdir := path.Join(storeDir, msgDir)
pdir := path.Join(fs.fcfg.StoreDir, "ptest")
os.Rename(mdir, pdir)
os.MkdirAll(mdir, 0755)
fs.Purge()
checkPurgeState(toStore * 2)
// Make sure we recover same state.
fs.Stop()
purgeDir := path.Join(fs.fcfg.StoreDir, purgeDir)
os.Rename(pdir, purgeDir)
fs, err = newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 64 * 1024}, StreamConfig{Name: "zzz", Storage: FileStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer fs.Stop()
if numBlocks := fs.numMsgBlocks(); numBlocks != 1 {
t.Fatalf("Expected to have exactly 1 empty msg block, got %d", numBlocks)
}
checkPurgeState(toStore * 2)
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if _, err := os.Stat(purgeDir); err == nil {
return fmt.Errorf("purge directory still present")
}
return nil
})
}
func TestFileStoreRemovePartialRecovery(t *testing.T) {