diff --git a/bitcask.go b/bitcask.go index 940fb65..76dfaf3 100644 --- a/bitcask.go +++ b/bitcask.go @@ -44,6 +44,10 @@ var ( // ErrDatabaseLocked is the error returned if the database is locked // (typically opened by another process) ErrDatabaseLocked = errors.New("error: database locked") + + // ErrMergeInProgress is the error returned if merge is called when already a merge + // is in progress + ErrMergeInProgress = errors.New("error: merge already in progress") ) // Bitcask is a struct that represents a on-disk LSM and WAL data structure @@ -62,6 +66,7 @@ type Bitcask struct { trie art.Tree indexer index.Indexer metadata *metadata.MetaData + isMerging bool } // Stats is a struct returned by Stats() on an open Bitcask instance @@ -123,15 +128,21 @@ func (b *Bitcask) Sync() error { return b.curr.Sync() } -// Get retrieves the value of the given key. If the key is not found or an/I/O -// error occurs a null byte slice is returned along with the error. +// Get fetches value for given key func (b *Bitcask) Get(key []byte) ([]byte, error) { + b.mu.RLock() + defer b.mu.RUnlock() + + return b.get(key) +} + +// get retrieves the value of the given key. If the key is not found or an/I/O +// error occurs a null byte slice is returned along with the error. +func (b *Bitcask) get(key []byte) ([]byte, error) { var df data.Datafile - b.mu.RLock() value, found := b.trie.Search(key) if !found { - b.mu.RUnlock() return nil, ErrKeyNotFound } @@ -144,7 +155,6 @@ func (b *Bitcask) Get(key []byte) ([]byte, error) { } e, err := df.ReadAt(item.Offset, item.Size) - b.mu.RUnlock() if err != nil { return nil, err } @@ -338,6 +348,33 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) { return b.curr.Write(e) } +// closeCurrentFile closes current datafile and makes it read only. +func (b *Bitcask) closeCurrentFile() error { + err := b.curr.Close() + if err != nil { + return err + } + id := b.curr.FileID() + df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask) + if err != nil { + return err + } + + b.datafiles[id] = df + return nil +} + +// openNewWritableFile opens new datafile for writing data +func (b *Bitcask) openNewWritableFile() error { + id := b.curr.FileID() + 1 + curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask) + if err != nil { + return err + } + b.curr = curr + return nil +} + func (b *Bitcask) Reopen() error { b.mu.Lock() defer b.mu.Unlock() @@ -367,6 +404,34 @@ func (b *Bitcask) Reopen() error { // and deleted keys removes. Duplicate key/value pairs are also removed. // Call this function periodically to reclaim disk space. func (b *Bitcask) Merge() error { + b.mu.Lock() + if b.isMerging { + b.mu.Unlock() + return ErrMergeInProgress + } + b.isMerging = true + b.mu.Unlock() + defer func() { + b.isMerging = false + }() + b.mu.RLock() + err := b.closeCurrentFile() + if err != nil { + b.mu.RUnlock() + return err + } + filesToMerge := make([]int, 0, len(b.datafiles)) + for k := range b.datafiles { + filesToMerge = append(filesToMerge, k) + } + err = b.openNewWritableFile() + if err != nil { + b.mu.RUnlock() + return err + } + b.mu.RUnlock() + sort.Ints(filesToMerge) + // Temporary merged database path temp, err := ioutil.TempDir(b.path, "merge") if err != nil { @@ -384,7 +449,12 @@ func (b *Bitcask) Merge() error { // Doing this automatically strips deleted keys and // old key/value pairs err = b.Fold(func(key []byte) error { - value, err := b.Get(key) + item, _ := b.trie.Search(key) + // if key was updated after start of merge operation, nothing to do + if item.(internal.Item).FileID > filesToMerge[len(filesToMerge)-1] { + return nil + } + value, err := b.get(key) if err != nil { return err } @@ -398,29 +468,33 @@ func (b *Bitcask) Merge() error { if err != nil { return err } - - err = mdb.Close() - if err != nil { + if err = mdb.Close(); err != nil { + return err + } + if err = b.Close(); err != nil { return err } - // Close the database - err = b.Close() - if err != nil { - return err - } - - // Remove all data files + // Remove data files files, err := ioutil.ReadDir(b.path) if err != nil { return err } for _, file := range files { - if !file.IsDir() { - err := os.RemoveAll(path.Join([]string{b.path, file.Name()}...)) - if err != nil { - return err - } + if file.IsDir() { + continue + } + ids, err := internal.ParseIds([]string{file.Name()}) + if err != nil { + return err + } + // if datafile was created after start of merge, skip + if len(ids) > 0 && ids[0] > filesToMerge[len(filesToMerge)-1] { + continue + } + err = os.RemoveAll(path.Join(b.path, file.Name())) + if err != nil { + return err } } diff --git a/bitcask_test.go b/bitcask_test.go index 08890aa..afd7d37 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -1034,7 +1034,7 @@ func TestMerge(t *testing.T) { s3, err := db.Stats() assert.NoError(err) - assert.Equal(1, s3.Datafiles) + assert.Equal(2, s3.Datafiles) assert.Equal(1, s3.Keys) assert.True(s3.Size > s1.Size) assert.True(s3.Size < s2.Size) @@ -1364,18 +1364,19 @@ func TestMergeErrors(t *testing.T) { assert.NoError(err) defer os.RemoveAll(testdir) - db, err := Open(testdir) + db, err := Open(testdir, WithMaxDatafileSize(22)) assert.NoError(err) assert.NoError(db.Put([]byte("foo"), []byte("bar"))) + assert.NoError(db.Put([]byte("bar"), []byte("baz"))) mockDatafile := new(mocks.Datafile) - mockDatafile.On("FileID").Return(0) + mockDatafile.On("Close").Return(nil) mockDatafile.On("ReadAt", int64(0), int64(22)).Return( internal.Entry{}, ErrMockError, ) - db.curr = mockDatafile + db.datafiles[0] = mockDatafile err = db.Merge() assert.Error(err)