1
0
mirror of https://github.com/taigrr/bitcask synced 2025-01-18 04:03:17 -08:00

new merge approach (#191)

* new merge approach

* code refactor

* comment added

* isMerging flag added to allow 1 merge operation at a time

* get api modified. merge updated (no recursive read locks)

Co-authored-by: yash <yash.chandra@grabpay.com>
Co-authored-by: James Mills <prologic@shortcircuit.net.au>
This commit is contained in:
Yash Suresh Chandra 2020-12-11 16:18:41 +05:30 committed by GitHub
parent 80c06a3572
commit e1cdffd8f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 100 additions and 25 deletions

View File

@ -44,6 +44,10 @@ var (
// ErrDatabaseLocked is the error returned if the database is locked
// (typically opened by another process)
ErrDatabaseLocked = errors.New("error: database locked")
// ErrMergeInProgress is the error returned if merge is called when already a merge
// is in progress
ErrMergeInProgress = errors.New("error: merge already in progress")
)
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
@ -62,6 +66,7 @@ type Bitcask struct {
trie art.Tree
indexer index.Indexer
metadata *metadata.MetaData
isMerging bool
}
// Stats is a struct returned by Stats() on an open Bitcask instance
@ -123,15 +128,21 @@ func (b *Bitcask) Sync() error {
return b.curr.Sync()
}
// Get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
// Get fetches value for given key
func (b *Bitcask) Get(key []byte) ([]byte, error) {
b.mu.RLock()
defer b.mu.RUnlock()
return b.get(key)
}
// get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
func (b *Bitcask) get(key []byte) ([]byte, error) {
var df data.Datafile
b.mu.RLock()
value, found := b.trie.Search(key)
if !found {
b.mu.RUnlock()
return nil, ErrKeyNotFound
}
@ -144,7 +155,6 @@ func (b *Bitcask) Get(key []byte) ([]byte, error) {
}
e, err := df.ReadAt(item.Offset, item.Size)
b.mu.RUnlock()
if err != nil {
return nil, err
}
@ -338,6 +348,33 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
return b.curr.Write(e)
}
// closeCurrentFile closes current datafile and makes it read only.
func (b *Bitcask) closeCurrentFile() error {
err := b.curr.Close()
if err != nil {
return err
}
id := b.curr.FileID()
df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
if err != nil {
return err
}
b.datafiles[id] = df
return nil
}
// openNewWritableFile opens new datafile for writing data
func (b *Bitcask) openNewWritableFile() error {
id := b.curr.FileID() + 1
curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
if err != nil {
return err
}
b.curr = curr
return nil
}
func (b *Bitcask) Reopen() error {
b.mu.Lock()
defer b.mu.Unlock()
@ -367,6 +404,34 @@ func (b *Bitcask) Reopen() error {
// and deleted keys removes. Duplicate key/value pairs are also removed.
// Call this function periodically to reclaim disk space.
func (b *Bitcask) Merge() error {
b.mu.Lock()
if b.isMerging {
b.mu.Unlock()
return ErrMergeInProgress
}
b.isMerging = true
b.mu.Unlock()
defer func() {
b.isMerging = false
}()
b.mu.RLock()
err := b.closeCurrentFile()
if err != nil {
b.mu.RUnlock()
return err
}
filesToMerge := make([]int, 0, len(b.datafiles))
for k := range b.datafiles {
filesToMerge = append(filesToMerge, k)
}
err = b.openNewWritableFile()
if err != nil {
b.mu.RUnlock()
return err
}
b.mu.RUnlock()
sort.Ints(filesToMerge)
// Temporary merged database path
temp, err := ioutil.TempDir(b.path, "merge")
if err != nil {
@ -384,7 +449,12 @@ func (b *Bitcask) Merge() error {
// Doing this automatically strips deleted keys and
// old key/value pairs
err = b.Fold(func(key []byte) error {
value, err := b.Get(key)
item, _ := b.trie.Search(key)
// if key was updated after start of merge operation, nothing to do
if item.(internal.Item).FileID > filesToMerge[len(filesToMerge)-1] {
return nil
}
value, err := b.get(key)
if err != nil {
return err
}
@ -398,29 +468,33 @@ func (b *Bitcask) Merge() error {
if err != nil {
return err
}
err = mdb.Close()
if err != nil {
if err = mdb.Close(); err != nil {
return err
}
if err = b.Close(); err != nil {
return err
}
// Close the database
err = b.Close()
if err != nil {
return err
}
// Remove all data files
// Remove data files
files, err := ioutil.ReadDir(b.path)
if err != nil {
return err
}
for _, file := range files {
if !file.IsDir() {
err := os.RemoveAll(path.Join([]string{b.path, file.Name()}...))
if err != nil {
return err
}
if file.IsDir() {
continue
}
ids, err := internal.ParseIds([]string{file.Name()})
if err != nil {
return err
}
// if datafile was created after start of merge, skip
if len(ids) > 0 && ids[0] > filesToMerge[len(filesToMerge)-1] {
continue
}
err = os.RemoveAll(path.Join(b.path, file.Name()))
if err != nil {
return err
}
}

View File

@ -1034,7 +1034,7 @@ func TestMerge(t *testing.T) {
s3, err := db.Stats()
assert.NoError(err)
assert.Equal(1, s3.Datafiles)
assert.Equal(2, s3.Datafiles)
assert.Equal(1, s3.Keys)
assert.True(s3.Size > s1.Size)
assert.True(s3.Size < s2.Size)
@ -1364,18 +1364,19 @@ func TestMergeErrors(t *testing.T) {
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir)
db, err := Open(testdir, WithMaxDatafileSize(22))
assert.NoError(err)
assert.NoError(db.Put([]byte("foo"), []byte("bar")))
assert.NoError(db.Put([]byte("bar"), []byte("baz")))
mockDatafile := new(mocks.Datafile)
mockDatafile.On("FileID").Return(0)
mockDatafile.On("Close").Return(nil)
mockDatafile.On("ReadAt", int64(0), int64(22)).Return(
internal.Entry{},
ErrMockError,
)
db.curr = mockDatafile
db.datafiles[0] = mockDatafile
err = db.Merge()
assert.Error(err)