From 8dca9cd2a72cb3fc2d0b6cf4b3296a741a518462 Mon Sep 17 00:00:00 2001 From: Ignacio Hagopian Date: Thu, 7 May 2020 14:48:36 -0300 Subject: [PATCH] Auto recovery (#153) * implement autorepair Signed-off-by: Ignacio Hagopian * fix misspell Signed-off-by: Ignacio Hagopian * Update internal/data/recover.go Co-authored-by: James Mills * Update internal/utils.go Co-authored-by: James Mills * Update internal/data/recover.go Co-authored-by: James Mills * skip failing test on windows Signed-off-by: Ignacio Hagopian Co-authored-by: James Mills --- bitcask.go | 8 +++- bitcask_test.go | 81 +++++++++++++++++++++++++++++++++++ internal/config/config.go | 1 + internal/data/recover.go | 89 +++++++++++++++++++++++++++++++++++++++ internal/utils.go | 3 +- options.go | 12 ++++++ 6 files changed, 191 insertions(+), 3 deletions(-) create mode 100644 internal/data/recover.go diff --git a/bitcask.go b/bitcask.go index 8e22a39..2b04e89 100644 --- a/bitcask.go +++ b/bitcask.go @@ -2,6 +2,7 @@ package bitcask import ( "errors" + "fmt" "hash/crc32" "io" "io/ioutil" @@ -310,7 +311,6 @@ func (b *Bitcask) Reopen() error { if err != nil { return err } - t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles) if err != nil { return err @@ -458,6 +458,11 @@ func Open(path string, options ...Option) (*Bitcask, error) { return nil, err } + if cfg.AutoRecovery { + if err := data.CheckAndRecover(path, cfg); err != nil { + return nil, fmt.Errorf("recovering database: %s", err) + } + } if err := bitcask.Reopen(); err != nil { return nil, err } @@ -520,7 +525,6 @@ func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles } return nil, err } - // Tombstone value (deleted key) if len(e.Value) == 0 { t.Delete(e.Key) diff --git a/bitcask_test.go b/bitcask_test.go index 1faefbc..a645743 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "os" + "path" "path/filepath" "reflect" "runtime" @@ -15,6 +16,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/prologic/bitcask/internal" "github.com/prologic/bitcask/internal/config" @@ -334,6 +336,85 @@ func TestConfigErrors(t *testing.T) { }) } +func TestAutoRecovery(t *testing.T) { + if runtime.GOOS == "windows" { + t.SkipNow() + } + withAutoRecovery := []bool{false, true} + + for _, autoRecovery := range withAutoRecovery { + t.Run(fmt.Sprintf("%v", autoRecovery), func(t *testing.T) { + require := require.New(t) + testdir, err := ioutil.TempDir("", "bitcask") + require.NoError(err) + db, err := Open(testdir) + require.NoError(err) + + // Insert 10 key-value pairs and verify all is ok. + makeKeyVal := func(i int) ([]byte, []byte) { + return []byte(fmt.Sprintf("foo%d", i)), []byte(fmt.Sprintf("bar%d", i)) + } + n := 10 + for i := 0; i < n; i++ { + key, val := makeKeyVal(i) + err = db.Put(key, val) + require.NoError(err) + } + for i := 0; i < n; i++ { + key, val := makeKeyVal(i) + rval, err := db.Get(key) + require.NoError(err) + require.Equal(val, rval) + } + err = db.Close() + require.NoError(err) + + // Corrupt the last inserted key + f, err := os.OpenFile(path.Join(testdir, "000000000.data"), os.O_RDWR, 0755) + require.NoError(err) + fi, err := f.Stat() + require.NoError(err) + err = f.Truncate(fi.Size() - 1) + require.NoError(err) + err = f.Close() + require.NoError(err) + + db, err = Open(testdir, WithAutoRecovery(autoRecovery)) + require.NoError(err) + defer db.Close() + // Check that all values but the last are still intact. + for i := 0; i < 9; i++ { + key, val := makeKeyVal(i) + rval, err := db.Get(key) + require.NoError(err) + require.Equal(val, rval) + } + // Check the index has no more keys than non-corrupted ones. + // i.e: all but the last one. + numKeys := 0 + for range db.Keys() { + numKeys++ + } + if !autoRecovery { + // We are opening without autorepair, and thus are + // in a corrupted state. The index isn't coherent with + // the datafile. + require.Equal(n, numKeys) + return + } + + require.Equal(n-1, numKeys, "The index should have n-1 keys") + + // Double-check explicitly the corrupted one isn't here. + // This check is redundant considering the last two checks, + // but doesn't hurt. + corrKey, _ := makeKeyVal(9) + _, err = db.Get(corrKey) + require.Equal(ErrKeyNotFound, err) + }) + } +} + func TestReIndex(t *testing.T) { assert := assert.New(t) diff --git a/internal/config/config.go b/internal/config/config.go index 76a8d0b..136facc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,7 @@ type Config struct { MaxKeySize uint32 `json:"max_key_size"` MaxValueSize uint64 `json:"max_value_size"` Sync bool `json:"sync"` + AutoRecovery bool `json:"autorecovery"` } // Load loads a configuration from the given path diff --git a/internal/data/recover.go b/internal/data/recover.go new file mode 100644 index 0000000..d9f01ea --- /dev/null +++ b/internal/data/recover.go @@ -0,0 +1,89 @@ +package data + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "github.com/prologic/bitcask/internal" + "github.com/prologic/bitcask/internal/config" + "github.com/prologic/bitcask/internal/data/codec" +) + +// CheckAndRecover checks and recovers the last datafile. +// If the datafile isn't corrupted, this is a noop. If it is, +// the longest non-corrupted prefix will be kept and the rest +// will be *deleted*. Also, the index file is also *deleted* which +// will be automatically recreated on next startup. +func CheckAndRecover(path string, cfg *config.Config) error { + dfs, err := internal.GetDatafiles(path) + if err != nil { + return fmt.Errorf("scanning datafiles: %s", err) + } + if len(dfs) == 0 { + return nil + } + f := dfs[len(dfs)-1] + recovered, err := recoverDatafile(f, cfg) + if err != nil { + return fmt.Errorf("recovering data file") + } + if recovered { + if err := os.Remove(filepath.Join(path, "index")); err != nil { + return fmt.Errorf("error deleting the index on recovery: %s", err) + } + } + return nil +} + +func recoverDatafile(path string, cfg *config.Config) (recovered bool, err error) { + f, err := os.Open(path) + if err != nil { + return false, fmt.Errorf("opening the datafile: %s", err) + } + defer func() { + err = f.Close() + }() + _, file := filepath.Split(path) + rPath := fmt.Sprintf("%s.recovered", file) + fr, err := os.OpenFile(rPath, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + return false, fmt.Errorf("creating the recovered datafile: %w", err) + } + defer func() { + err = fr.Close() + }() + + dec := codec.NewDecoder(f, cfg.MaxKeySize, cfg.MaxValueSize) + enc := codec.NewEncoder(fr) + e := internal.Entry{} + + corrupted := false + for !corrupted { + _, err = dec.Decode(&e) + if err == io.EOF { + break + } + if codec.IsCorruptedData(err) { + corrupted = true + continue + } + if err != nil { + return false, fmt.Errorf("unexpected error while reading datafile: %w", err) + } + if _, err := enc.Encode(e); err != nil { + return false, fmt.Errorf("writing to recovered datafile: %w", err) + } + } + if !corrupted { + if err := os.Remove(fr.Name()); err != nil { + return false, fmt.Errorf("can't remove temporal recovered datafile: %w", err) + } + return false, nil + } + if err := os.Rename(rPath, path); err != nil { + return false, fmt.Errorf("removing corrupted file: %s", err) + } + return true, nil +} diff --git a/internal/utils.go b/internal/utils.go index a10f402..7dd7a2a 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -33,7 +33,8 @@ func DirSize(path string) (int64, error) { // GetDatafiles returns a list of all data files stored in the database path // given by `path`. All datafiles are identified by the the glob `*.data` and -// the basename is represented by an monotomic increasing integer. +// the basename is represented by a monotonic increasing integer. +// The returned files are *sorted* in increasing order. func GetDatafiles(path string) ([]string, error) { fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path)) if err != nil { diff --git a/options.go b/options.go index 14c66d8..937780a 100644 --- a/options.go +++ b/options.go @@ -14,11 +14,23 @@ const ( // DefaultSync is the default file synchronization action DefaultSync = false + + // DefaultAutoRecovery is the default auto-recovery action. ) // Option is a function that takes a config struct and modifies it type Option func(*config.Config) error +// WithAutoRecovery sets auto recovery of data and index file recreation. +// IMPORTANT: This flag MUST BE used only if a proper backup was made of all +// the existing datafiles. +func WithAutoRecovery(enabled bool) Option { + return func(cfg *config.Config) error { + cfg.AutoRecovery = enabled + return nil + } +} + // WithMaxDatafileSize sets the maximum datafile size option func WithMaxDatafileSize(size int) Option { return func(cfg *config.Config) error {