From f4357e6f18e272d0fcab1d373a1d7d18a8b6425d Mon Sep 17 00:00:00 2001 From: yashschandra Date: Mon, 30 Nov 2020 03:19:02 +0530 Subject: [PATCH] local live backup support (#185) * live backup first commit * exclude lock file in backup * create path if not exist for backup Co-authored-by: yash Co-authored-by: James Mills --- bitcask.go | 131 +++++++++++++++++++++++++--------- bitcask_test.go | 10 ++- internal/metadata/metadata.go | 21 ++++++ internal/utils.go | 47 ++++++++++++ internal/utils_test.go | 108 ++++++++++++++++++++++++++++ 5 files changed, 284 insertions(+), 33 deletions(-) create mode 100644 internal/metadata/metadata.go create mode 100644 internal/utils_test.go diff --git a/bitcask.go b/bitcask.go index 2ed5567..8d04596 100644 --- a/bitcask.go +++ b/bitcask.go @@ -18,6 +18,7 @@ import ( "github.com/prologic/bitcask/internal/config" "github.com/prologic/bitcask/internal/data" "github.com/prologic/bitcask/internal/index" + "github.com/prologic/bitcask/internal/metadata" ) var ( @@ -59,6 +60,7 @@ type Bitcask struct { datafiles map[int]data.Datafile trie art.Tree indexer index.Indexer + metadata *metadata.MetaData } // Stats is a struct returned by Stats() on an open Bitcask instance @@ -95,7 +97,7 @@ func (b *Bitcask) Close() error { os.Remove(b.Flock.Path()) }() - if err := b.indexer.Save(b.trie, filepath.Join(b.path, "index")); err != nil { + if err := b.saveIndex(); err != nil { return err } @@ -183,6 +185,13 @@ func (b *Bitcask) Put(key, value []byte) error { } } + if b.metadata.IndexUpToDate { + b.metadata.IndexUpToDate = false + if err := b.metadata.Save(filepath.Join(b.path, "meta.json"), b.config.FileFileModeBeforeUmask); err != nil { + return err + } + } + item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n} b.trie.Insert(key, item) b.mu.Unlock() @@ -302,6 +311,10 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) { return -1, 0, err } b.curr = curr + err = b.saveIndex() + if err != nil { + return -1, 0, err + } } e := internal.NewEntry(key, value) @@ -316,7 +329,7 @@ func (b *Bitcask) Reopen() error { if err != nil { return err } - t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles) + t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles, lastID, b.metadata.IndexUpToDate) if err != nil { return err } @@ -418,8 +431,9 @@ func (b *Bitcask) Merge() error { // configuration options as functions. func Open(path string, options ...Option) (*Bitcask, error) { var ( - cfg *config.Config - err error + cfg *config.Config + err error + meta *metadata.MetaData ) configPath := filepath.Join(path, "config.json") @@ -442,12 +456,18 @@ func Open(path string, options ...Option) (*Bitcask, error) { return nil, err } + meta, err = loadMetadata(path) + if err != nil { + return nil, err + } + bitcask := &Bitcask{ - Flock: flock.New(filepath.Join(path, "lock")), - config: cfg, - options: options, - path: path, - indexer: index.NewIndexer(), + Flock: flock.New(filepath.Join(path, "lock")), + config: cfg, + options: options, + path: path, + indexer: index.NewIndexer(), + metadata: meta, } locked, err := bitcask.Flock.TryLock() @@ -475,6 +495,31 @@ func Open(path string, options ...Option) (*Bitcask, error) { return bitcask, nil } +// Backup copies db directory to given path +// it creates path if it does not exist +func (b *Bitcask) Backup(path string) error { + if !internal.Exists(path) { + if err := os.MkdirAll(path, b.config.DirFileModeBeforeUmask); err != nil { + return err + } + } + return internal.Copy(b.path, path, []string{"lock"}) +} + +// saveIndex saves index currently in RAM to disk +func (b *Bitcask) saveIndex() error { + tempIdx := "temp_index" + if err := b.indexer.Save(b.trie, filepath.Join(b.path, tempIdx)); err != nil { + return err + } + err := os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index")) + if err != nil { + return err + } + b.metadata.IndexUpToDate = true + return b.metadata.Save(filepath.Join(b.path, "meta.json"), b.config.DirFileModeBeforeUmask) +} + func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileModeBeforeUmask os.FileMode) (datafiles map[int]data.Datafile, lastID int, err error) { fns, err := internal.GetDatafiles(path) if err != nil { @@ -513,34 +558,56 @@ func getSortedDatafiles(datafiles map[int]data.Datafile) []data.Datafile { return out } -func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile) (art.Tree, error) { +func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile, lastID int, indexUpToDate bool) (art.Tree, error) { t, found, err := indexer.Load(filepath.Join(path, "index"), maxKeySize) if err != nil { return nil, err } - if !found { - sortedDatafiles := getSortedDatafiles(datafiles) - for _, df := range sortedDatafiles { - var offset int64 - for { - e, n, err := df.Read() - if err != nil { - if err == io.EOF { - break - } - return nil, err - } - // Tombstone value (deleted key) - if len(e.Value) == 0 { - t.Delete(e.Key) - offset += n - continue - } - item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n} - t.Insert(e.Key, item) - offset += n - } + if found && indexUpToDate { + return t, nil + } + if found { + if err := loadIndexFromDatafile(t, datafiles[lastID]); err != nil { + return nil, err + } + return t, nil + } + sortedDatafiles := getSortedDatafiles(datafiles) + for _, df := range sortedDatafiles { + if err := loadIndexFromDatafile(t, df); err != nil { + return nil, err } } return t, nil } + +func loadIndexFromDatafile(t art.Tree, df data.Datafile) error { + var offset int64 + for { + e, n, err := df.Read() + if err != nil { + if err == io.EOF { + break + } + return err + } + // Tombstone value (deleted key) + if len(e.Value) == 0 { + t.Delete(e.Key) + offset += n + continue + } + item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n} + t.Insert(e.Key, item) + offset += n + } + return nil +} + +func loadMetadata(path string) (*metadata.MetaData, error) { + if !internal.Exists(filepath.Join(path, "meta.json")) { + meta := new(metadata.MetaData) + return meta, nil + } + return metadata.Load(filepath.Join(path, "meta.json")) +} diff --git a/bitcask_test.go b/bitcask_test.go index e0c5f34..9908963 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -136,6 +136,14 @@ func TestAll(t *testing.T) { assert.NoError(err) }) + t.Run("Backup", func(t *testing.T) { + path, err := ioutil.TempDir("", "backup") + defer os.RemoveAll(path) + assert.NoError(err) + err = db.Backup(filepath.Join(path, "db-backup")) + assert.NoError(err) + }) + t.Run("Close", func(t *testing.T) { err = db.Close() assert.NoError(err) @@ -1208,7 +1216,7 @@ func TestCloseErrors(t *testing.T) { assert.NoError(err) mockIndexer := new(mocks.Indexer) - mockIndexer.On("Save", db.trie, filepath.Join(db.path, "index")).Return(ErrMockError) + mockIndexer.On("Save", db.trie, filepath.Join(db.path, "temp_index")).Return(ErrMockError) db.indexer = mockIndexer err = db.Close() diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go new file mode 100644 index 0000000..a14ed60 --- /dev/null +++ b/internal/metadata/metadata.go @@ -0,0 +1,21 @@ +package metadata + +import ( + "os" + + "github.com/prologic/bitcask/internal" +) + +type MetaData struct { + IndexUpToDate bool `json:"index_up_to_date"` +} + +func (m *MetaData) Save(path string, mode os.FileMode) error { + return internal.SaveJsonToFile(m, path, mode) +} + +func Load(path string) (*MetaData, error) { + var m MetaData + err := internal.LoadFromJsonFile(path, &m) + return &m, err +} diff --git a/internal/utils.go b/internal/utils.go index 7dd7a2a..1524300 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -1,7 +1,9 @@ package internal import ( + "encoding/json" "fmt" + "io/ioutil" "os" "path/filepath" "sort" @@ -63,3 +65,48 @@ func ParseIds(fns []string) ([]int, error) { sort.Ints(ids) return ids, nil } + +// Copy copies source contents to destination +func Copy(src, dst string, exclude []string) error { + return filepath.Walk(src, func(path string, info os.FileInfo, err error) error { + relPath := strings.Replace(path, src, "", 1) + if relPath == "" { + return nil + } + for _, e := range exclude { + matched, err := filepath.Match(e, info.Name()) + if err != nil { + return err + } + if matched { + return nil + } + } + if info.IsDir() { + return os.Mkdir(filepath.Join(dst, relPath), info.Mode()) + } + var data, err1 = ioutil.ReadFile(filepath.Join(src, relPath)) + if err1 != nil { + return err1 + } + return ioutil.WriteFile(filepath.Join(dst, relPath), data, info.Mode()) + }) +} + +// SaveJsonToFile converts v into json and store in file identified by path +func SaveJsonToFile(v interface{}, path string, mode os.FileMode) error { + b, err := json.Marshal(v) + if err != nil { + return err + } + return ioutil.WriteFile(path, b, mode) +} + +// LoadFromJsonFile reads file located at `path` and put its content in json format in v +func LoadFromJsonFile(path string, v interface{}) error { + b, err := ioutil.ReadFile(path) + if err != nil { + return err + } + return json.Unmarshal(b, v) +} diff --git a/internal/utils_test.go b/internal/utils_test.go new file mode 100644 index 0000000..4ad1645 --- /dev/null +++ b/internal/utils_test.go @@ -0,0 +1,108 @@ +package internal + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_Copy(t *testing.T) { + assert := assert.New(t) + t.Run("CopyDir", func(t *testing.T) { + tempsrc, err := ioutil.TempDir("", "test") + assert.NoError(err) + defer os.RemoveAll(tempsrc) + var f *os.File + + tempdir, err := ioutil.TempDir(tempsrc, "") + assert.NoError(err) + + f, err = os.OpenFile(filepath.Join(tempsrc, "file1"), os.O_WRONLY|os.O_CREATE, 0755) + assert.NoError(err) + n, err := f.WriteString("test123") + assert.Equal(7, n) + assert.NoError(err) + f.Close() + + f, err = os.OpenFile(filepath.Join(tempsrc, "file2"), os.O_WRONLY|os.O_CREATE, 0755) + assert.NoError(err) + n, err = f.WriteString("test1234") + assert.Equal(8, n) + assert.NoError(err) + f.Close() + + f, err = os.OpenFile(filepath.Join(tempsrc, "file3"), os.O_WRONLY|os.O_CREATE, 0755) + assert.NoError(err) + f.Close() + + tempdst, err := ioutil.TempDir("", "backup") + assert.NoError(err) + defer os.RemoveAll(tempdst) + err = Copy(tempsrc, tempdst, []string{"file3"}) + assert.NoError(err) + buf := make([]byte, 10) + + exists := Exists(filepath.Join(tempdst, filepath.Base(tempdir))) + assert.Equal(true, exists) + + f, err = os.Open(filepath.Join(tempdst, "file1")) + assert.NoError(err) + n, err = f.Read(buf[:7]) + assert.NoError(err) + assert.Equal(7, n) + assert.Equal([]byte("test123"), buf[:7]) + _, err = f.Read(buf) + assert.Equal(io.EOF, err) + f.Close() + + f, err = os.Open(filepath.Join(tempdst, "file2")) + assert.NoError(err) + n, err = f.Read(buf[:8]) + assert.NoError(err) + assert.Equal(8, n) + assert.Equal([]byte("test1234"), buf[:8]) + _, err = f.Read(buf) + assert.Equal(io.EOF, err) + f.Close() + + exists = Exists(filepath.Join(tempdst, "file3")) + assert.Equal(false, exists) + }) +} + +func Test_SaveAndLoad(t *testing.T) { + assert := assert.New(t) + t.Run("save and load", func(t *testing.T) { + tempdir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + defer os.RemoveAll(tempdir) + type test struct { + Value bool `json:"value"` + } + m := test{Value: true} + err = SaveJsonToFile(&m, filepath.Join(tempdir, "meta.json"), 0755) + assert.NoError(err) + m1 := test{} + err = LoadFromJsonFile(filepath.Join(tempdir, "meta.json"), &m1) + assert.NoError(err) + assert.Equal(m, m1) + }) + + t.Run("save and load error", func(t *testing.T) { + tempdir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + defer os.RemoveAll(tempdir) + type test struct { + Value bool `json:"value"` + } + err = SaveJsonToFile(make(chan int), filepath.Join(tempdir, "meta.json"), 0755) + assert.Error(err) + m1 := test{} + err = LoadFromJsonFile(filepath.Join(tempdir, "meta.json"), &m1) + assert.Error(err) + }) +}