Optimize for restore time.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-03 13:28:46 -07:00
parent 4a5b76b0e8
commit 2d2bb77f6e
3 changed files with 230 additions and 206 deletions

View File

@@ -433,7 +433,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Check if our prior remember a last past where we can see.
if fs.ld != nil && prior.LastSeq > fs.state.LastSeq {
fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime
if _, err = fs.newMsgBlockForWrite(); err != nil {
if lmb, err := fs.newMsgBlockForWrite(); err == nil {
lmb.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano())
} else {
return nil, err
}
}
@@ -442,8 +444,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
defer fs.kickFlushStateLoop()
// Also make sure we get rid of old idx and fss files on return.
defer func() {
os.RemoveAll(filepath.Join(fs.fcfg.StoreDir, msgDir, indexScanAll))
os.RemoveAll(filepath.Join(fs.fcfg.StoreDir, msgDir, fssScanAll))
go func() {
os.RemoveAll(filepath.Join(fs.fcfg.StoreDir, msgDir, indexScanAll))
os.RemoveAll(filepath.Join(fs.fcfg.StoreDir, msgDir, fssScanAll))
}()
}()
}
@@ -454,6 +458,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
if len(fs.tombs) > 0 {
for _, seq := range fs.tombs {
fs.removeMsg(seq, false, false, false)
fs.removeFromLostData(seq)
}
// Not needed after this phase.
fs.tombs = nil
@@ -931,7 +936,6 @@ func (mb *msgBlock) ensureLastChecksumLoaded() {
// Lock held on entry
func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) {
mb := fs.initMsgBlock(index)
fs.loadEncryptionForMsgBlock(mb)
// Open up the message file, but we will try to recover from the index file.
// We will check that the last checksums match.
@@ -946,6 +950,10 @@ func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) {
} else {
return nil, err
}
// Make sure encryption loaded if needed.
fs.loadEncryptionForMsgBlock(mb)
// Grab last checksum from main block file.
var lchk [8]byte
if mb.rbytes >= checksumSize {
@@ -1014,15 +1022,50 @@ func (fs *fileStore) addLostData(ld *LostStreamData) {
return
}
if fs.ld != nil {
fs.ld.Msgs = append(fs.ld.Msgs, ld.Msgs...)
msgs := fs.ld.Msgs
sort.Slice(msgs, func(i, j int) bool { return msgs[i] < msgs[j] })
fs.ld.Bytes += ld.Bytes
var added bool
for _, seq := range ld.Msgs {
if _, found := fs.ld.exists(seq); !found {
fs.ld.Msgs = append(fs.ld.Msgs, seq)
added = true
}
}
if added {
msgs := fs.ld.Msgs
sort.Slice(msgs, func(i, j int) bool { return msgs[i] < msgs[j] })
fs.ld.Bytes += ld.Bytes
}
} else {
fs.ld = ld
}
}
// Helper to see if we already have this sequence reported in our lost data.
func (ld *LostStreamData) exists(seq uint64) (int, bool) {
i, found := sort.Find(len(ld.Msgs), func(i int) int {
tseq := ld.Msgs[i]
if tseq < seq {
return -1
}
if tseq > seq {
return +1
}
return 0
})
return i, found
}
func (fs *fileStore) removeFromLostData(seq uint64) {
if fs.ld == nil {
return
}
if i, found := fs.ld.exists(seq); found {
fs.ld.Msgs = append(fs.ld.Msgs[:i], fs.ld.Msgs[i+1:]...)
if len(fs.ld.Msgs) == 0 {
fs.ld = nil
}
}
}
func (fs *fileStore) rebuildState(ld *LostStreamData) {
fs.mu.Lock()
defer fs.mu.Unlock()
@@ -1387,48 +1430,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
return nil, tombstones, nil
}
// Used when we scan the msg blocks.
type blockFiles struct {
blksSeen map[uint32]struct{}
maxIndex uint32
}
// This will grab all the block files.
func (fs *fileStore) grabMsgBlockFiles(ch chan *blockFiles) {
f, err := os.Open(filepath.Join(fs.fcfg.StoreDir, msgDir))
if err != nil {
ch <- nil
return
}
defer f.Close()
dirs, err := f.ReadDir(-1)
if err != nil {
ch <- nil
return
}
result := &blockFiles{blksSeen: make(map[uint32]struct{})}
for _, fi := range dirs {
var index uint32
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
result.blksSeen[index] = struct{}{}
if index > result.maxIndex {
result.maxIndex = index
}
}
}
ch <- result
}
// recoverFullState will attempt to receover our last full state and re-process any state changes
// that happened afterwards.
func (fs *fileStore) recoverFullState() (rerr error) {
// Grab all the msgBlock files in parallel in case there are many.
rch := make(chan *blockFiles, 1)
go fs.grabMsgBlockFiles(rch)
fs.mu.Lock()
defer fs.mu.Unlock()
@@ -1438,6 +1442,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if _, err := os.Stat(pdir); err == nil {
os.RemoveAll(pdir)
}
// Grab our stream state file and load it in.
fn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
buf, err := os.ReadFile(fn)
@@ -1586,54 +1591,42 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return errCorruptState
}
// Grab the max blk index we see from scanning the directory. The full snapshot has the index that was lmb when
// we created it, so with that and max we know blocks to process. We do this in parallel in casee lots of blks.
blkFiles := <-rch
defer func() {
// Make sure we saw all of our blk files.
for _, mb := range fs.blks {
if _, ok := blkFiles.blksSeen[mb.index]; !ok {
if ld, _, _ := mb.rebuildState(); ld != nil {
// If we have lost data make sure we track here.
fs.addLostData(ld)
rerr = errCorruptState
}
}
}
}()
// Move into place our state, msgBlks and subject info.
fs.state = state
// If our saved state is past what we see on disk, fallback and rebuild.
if blkFiles != nil && blkFiles.maxIndex < blkIndex {
return errPriorState
}
// First let's check the happy path, open the blk file that was the lmb when we created the full state.
// See if we have the last block available.
var matched bool
var mb *msgBlock
if mb = fs.bim[blkIndex]; mb != nil {
matched = bytes.Equal(mb.lastChecksum(), lchk[:])
if matched && blkIndex == blkFiles.maxIndex {
return nil
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
}
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
}
// If we are here we did not match the happy path.
// We need to go through and find our checksum. This should be in blkIndex, but might not be.
start, stop := blkIndex, blkFiles.maxIndex
// We may need to check other blocks. Even if we matched last checksum we will see if there is another block.
// If we did not match we re-process the last block.
start := blkIndex
if matched {
start++
}
for bi := start; bi <= stop; bi++ {
for bi := start; ; bi++ {
nmb, err := fs.recoverMsgBlock(bi)
if err != nil {
if os.IsNotExist(err) {
return nil
}
os.Remove(fn)
return err
}
if nmb != nil {
@@ -1656,8 +1649,6 @@ func (fs *fileStore) recoverFullState() (rerr error) {
fs.state.Bytes += nmb.bytes
}
}
return nil
}
// adjustAccounting will be called when a stream state was only partially accounted for
@@ -4237,6 +4228,8 @@ func (fs *fileStore) checkMsgs() *LostStreamData {
fs.psim = make(map[string]*psi)
for _, mb := range fs.blks {
// Make sure encryption loaded if needed for the block.
fs.loadEncryptionForMsgBlock(mb)
// FIXME(dlc) - check tombstones here too?
if ld, _, err := mb.rebuildState(); err != nil && ld != nil {
// Rebuild fs state too.
@@ -5029,6 +5022,9 @@ func (mb *msgBlock) cacheNotLoaded() bool {
func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) {
f, err := os.Open(mb.mfn)
if err != nil {
if os.IsNotExist(err) {
err = errNoBlkData
}
return nil, err
}
defer f.Close()
@@ -5117,6 +5113,12 @@ checkCache:
// We want to hold the mb lock here to avoid any changes to state.
buf, err := mb.loadBlock(nil)
if err != nil {
if err == errNoBlkData {
if ld, _, err := mb.rebuildStateLocked(); err != nil && ld != nil {
// Rebuild fs state too.
go mb.fs.rebuildState(ld)
}
}
return err
}
@@ -5197,6 +5199,7 @@ var (
errMsgBlkTooBig = errors.New("message block size exceeded int capacity")
errUnknownCipher = errors.New("unknown cipher")
errNoMainKey = errors.New("encrypted store encountered with no main key")
errNoBlkData = errors.New("message block data missing")
)
const (
@@ -6722,7 +6725,6 @@ func (fs *fileStore) Delete() error {
}
return ErrStoreClosed
}
fs.Purge()
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
// If purge directory still exists then we need to wait
@@ -6731,6 +6733,9 @@ func (fs *fileStore) Delete() error {
os.RemoveAll(pdir)
}
// Do Purge() since if we have lots of blocks uses a mv/rename.
fs.Purge()
if err := fs.Stop(); err != nil {
return err
}

View File

@@ -4764,6 +4764,8 @@ func TestFileStoreMsgBlkFailOnKernelFaultLostDataReporting(t *testing.T) {
// We want to make sure all of the scenarios report lost data properly.
// Will run 3 scenarios, 1st block, last block, interior block.
// The new system does not detect byzantine behavior by default on creating the store.
// A LoadMsg() of checkMsgs() call will be needed now.
// First block
fs.mu.RLock()
@@ -4780,6 +4782,17 @@ func TestFileStoreMsgBlkFailOnKernelFaultLostDataReporting(t *testing.T) {
require_NoError(t, err)
defer fs.Stop()
_, err = fs.LoadMsg(1, nil)
require_Error(t, err, errNoBlkData)
// Load will rebuild fs itself async..
checkFor(t, time.Second, 50*time.Millisecond, func() error {
if state := fs.State(); state.Lost != nil {
return nil
}
return errors.New("no ld yet")
})
state := fs.State()
require_True(t, state.FirstSeq == 94)
require_True(t, state.Lost != nil)
@@ -4823,6 +4836,9 @@ func TestFileStoreMsgBlkFailOnKernelFaultLostDataReporting(t *testing.T) {
require_NoError(t, err)
defer fs.Stop()
// Need checkMsgs to catch interior one.
require_True(t, fs.checkMsgs() != nil)
state = fs.State()
require_True(t, state.FirstSeq == 94)
require_True(t, state.LastSeq == 500) // Make sure we do not lose last seq.
@@ -5251,6 +5267,131 @@ func TestFileStoreKeepWithDeletedMsgsBug(t *testing.T) {
require_True(t, n == 3)
}
func TestFileStoreRestartWithExpireAndLockingBug(t *testing.T) {
sd := t.TempDir()
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}
fs, err := newFileStore(FileStoreConfig{StoreDir: sd}, scfg)
require_NoError(t, err)
defer fs.Stop()
// 20 total
msg := []byte("HELLO WORLD")
for i := 0; i < 10; i++ {
fs.StoreMsg("A", nil, msg)
fs.StoreMsg("B", nil, msg)
}
fs.Stop()
// Now change config underneath of so we will do expires at startup.
scfg.MaxMsgs = 15
scfg.MaxMsgsPer = 2
newCfg := FileStreamInfo{Created: fs.cfg.Created, StreamConfig: scfg}
// Replace
fs.cfg = newCfg
require_NoError(t, fs.writeStreamMeta())
fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, scfg)
require_NoError(t, err)
defer fs.Stop()
}
// Test that loads from lmb under lots of writes do not return errPartialCache.
func TestFileStoreErrPartialLoad(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
put := func(num int) {
for i := 0; i < num; i++ {
fs.StoreMsg("Z", nil, []byte("ZZZZZZZZZZZZZ"))
}
}
put(100)
// Dump cache of lmb.
clearCache := func() {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.Lock()
lmb.clearCache()
lmb.mu.Unlock()
}
clearCache()
qch := make(chan struct{})
defer close(qch)
for i := 0; i < 10; i++ {
go func() {
for {
select {
case <-qch:
return
default:
put(5)
}
}
}()
}
time.Sleep(100 * time.Millisecond)
var smv StoreMsg
for i := 0; i < 10_000; i++ {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.Lock()
first, last := fs.lmb.first.seq, fs.lmb.last.seq
if i%100 == 0 {
lmb.clearCache()
}
lmb.mu.Unlock()
if spread := int(last - first); spread > 0 {
seq := first + uint64(rand.Intn(spread))
_, err = fs.LoadMsg(seq, &smv)
require_NoError(t, err)
}
}
}
func TestFileStoreErrPartialLoadOnSyncClose(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 500},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()
// This yields an internal record length of 50 bytes. So 10 msgs per blk.
msgLen := 19
msg := bytes.Repeat([]byte("A"), msgLen)
// Load up half the block.
for _, subj := range []string{"A", "B", "C", "D", "E"} {
fs.StoreMsg(subj, nil, msg)
}
// Now simulate the sync timer closing the last block.
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
require_True(t, lmb != nil)
lmb.mu.Lock()
lmb.expireCacheLocked()
lmb.dirtyCloseWithRemove(false)
lmb.mu.Unlock()
fs.StoreMsg("Z", nil, msg)
_, err = fs.LoadMsg(1, nil)
require_NoError(t, err)
}
///////////////////////////////////////////////////////////////////////////
// New WAL based architecture tests
///////////////////////////////////////////////////////////////////////////
@@ -5689,35 +5830,6 @@ func TestFileStoreFullStateTestSysRemovals(t *testing.T) {
})
}
func TestFileStoreRestartWithExpireAndLockingBug(t *testing.T) {
sd := t.TempDir()
scfg := StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage}
fs, err := newFileStore(FileStoreConfig{StoreDir: sd}, scfg)
require_NoError(t, err)
defer fs.Stop()
// 20 total
msg := []byte("HELLO WORLD")
for i := 0; i < 10; i++ {
fs.StoreMsg("A", nil, msg)
fs.StoreMsg("B", nil, msg)
}
fs.Stop()
// Now change config underneath of so we will do expires at startup.
scfg.MaxMsgs = 15
scfg.MaxMsgsPer = 2
newCfg := FileStreamInfo{Created: fs.cfg.Created, StreamConfig: scfg}
// Replace
fs.cfg = newCfg
require_NoError(t, fs.writeStreamMeta())
fs, err = newFileStore(FileStoreConfig{StoreDir: sd}, scfg)
require_NoError(t, err)
defer fs.Stop()
}
///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
@@ -5754,99 +5866,3 @@ func Benchmark_FileStoreSelectMsgBlock(b *testing.B) {
}
b.StopTimer()
}
// Test that loads from lmb under lots of writes do not return errPartialCache.
func TestFileStoreErrPartialLoad(t *testing.T) {
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
put := func(num int) {
for i := 0; i < num; i++ {
fs.StoreMsg("Z", nil, []byte("ZZZZZZZZZZZZZ"))
}
}
put(100)
// Dump cache of lmb.
clearCache := func() {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.Lock()
lmb.clearCache()
lmb.mu.Unlock()
}
clearCache()
qch := make(chan struct{})
defer close(qch)
for i := 0; i < 10; i++ {
go func() {
for {
select {
case <-qch:
return
default:
put(5)
}
}
}()
}
time.Sleep(100 * time.Millisecond)
var smv StoreMsg
for i := 0; i < 10_000; i++ {
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
lmb.mu.Lock()
first, last := fs.lmb.first.seq, fs.lmb.last.seq
if i%100 == 0 {
lmb.clearCache()
}
lmb.mu.Unlock()
if spread := int(last - first); spread > 0 {
seq := first + uint64(rand.Intn(spread))
_, err = fs.LoadMsg(seq, &smv)
require_NoError(t, err)
}
}
}
func TestFileStoreErrPartialLoadOnSyncClose(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 500},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage},
)
require_NoError(t, err)
defer fs.Stop()
// This yields an internal record length of 50 bytes. So 10 msgs per blk.
msgLen := 19
msg := bytes.Repeat([]byte("A"), msgLen)
// Load up half the block.
for _, subj := range []string{"A", "B", "C", "D", "E"} {
fs.StoreMsg(subj, nil, msg)
}
// Now simulate the sync timer closing the last block.
fs.mu.RLock()
lmb := fs.lmb
fs.mu.RUnlock()
require_True(t, lmb != nil)
lmb.mu.Lock()
lmb.expireCacheLocked()
lmb.dirtyCloseWithRemove(false)
lmb.mu.Unlock()
fs.StoreMsg("Z", nil, msg)
_, err = fs.LoadMsg(1, nil)
require_NoError(t, err)
}

View File

@@ -20356,6 +20356,9 @@ func TestJetStreamMsgBlkFailOnKernelFault(t *testing.T) {
nc, js = jsClientConnect(t, s)
defer nc.Close()
_, err = js.GetMsg("TEST", 17)
require_Error(t, err, nats.ErrMsgNotFound)
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.NumDeleted == 3)