mirror of
https://github.com/taigrr/bitcask
synced 2025-01-18 04:03:17 -08:00
Auto recovery (#153)
* implement autorepair Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * fix misspell Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> * Update internal/data/recover.go Co-authored-by: James Mills <prologic@shortcircuit.net.au> * Update internal/utils.go Co-authored-by: James Mills <prologic@shortcircuit.net.au> * Update internal/data/recover.go Co-authored-by: James Mills <prologic@shortcircuit.net.au> * skip failing test on windows Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com> Co-authored-by: James Mills <prologic@shortcircuit.net.au>
This commit is contained in:
parent
7b24d87695
commit
8dca9cd2a7
@ -2,6 +2,7 @@ package bitcask
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@ -310,7 +311,6 @@ func (b *Bitcask) Reopen() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -458,6 +458,11 @@ func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cfg.AutoRecovery {
|
||||
if err := data.CheckAndRecover(path, cfg); err != nil {
|
||||
return nil, fmt.Errorf("recovering database: %s", err)
|
||||
}
|
||||
}
|
||||
if err := bitcask.Reopen(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -520,7 +525,6 @@ func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Tombstone value (deleted key)
|
||||
if len(e.Value) == 0 {
|
||||
t.Delete(e.Key)
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime"
|
||||
@ -15,6 +16,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prologic/bitcask/internal"
|
||||
"github.com/prologic/bitcask/internal/config"
|
||||
@ -334,6 +336,85 @@ func TestConfigErrors(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestAutoRecovery(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.SkipNow()
|
||||
}
|
||||
withAutoRecovery := []bool{false, true}
|
||||
|
||||
for _, autoRecovery := range withAutoRecovery {
|
||||
t.Run(fmt.Sprintf("%v", autoRecovery), func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
require.NoError(err)
|
||||
db, err := Open(testdir)
|
||||
require.NoError(err)
|
||||
|
||||
// Insert 10 key-value pairs and verify all is ok.
|
||||
makeKeyVal := func(i int) ([]byte, []byte) {
|
||||
return []byte(fmt.Sprintf("foo%d", i)), []byte(fmt.Sprintf("bar%d", i))
|
||||
}
|
||||
n := 10
|
||||
for i := 0; i < n; i++ {
|
||||
key, val := makeKeyVal(i)
|
||||
err = db.Put(key, val)
|
||||
require.NoError(err)
|
||||
}
|
||||
for i := 0; i < n; i++ {
|
||||
key, val := makeKeyVal(i)
|
||||
rval, err := db.Get(key)
|
||||
require.NoError(err)
|
||||
require.Equal(val, rval)
|
||||
}
|
||||
err = db.Close()
|
||||
require.NoError(err)
|
||||
|
||||
// Corrupt the last inserted key
|
||||
f, err := os.OpenFile(path.Join(testdir, "000000000.data"), os.O_RDWR, 0755)
|
||||
require.NoError(err)
|
||||
fi, err := f.Stat()
|
||||
require.NoError(err)
|
||||
err = f.Truncate(fi.Size() - 1)
|
||||
require.NoError(err)
|
||||
err = f.Close()
|
||||
require.NoError(err)
|
||||
|
||||
db, err = Open(testdir, WithAutoRecovery(autoRecovery))
|
||||
require.NoError(err)
|
||||
defer db.Close()
|
||||
// Check that all values but the last are still intact.
|
||||
for i := 0; i < 9; i++ {
|
||||
key, val := makeKeyVal(i)
|
||||
rval, err := db.Get(key)
|
||||
require.NoError(err)
|
||||
require.Equal(val, rval)
|
||||
}
|
||||
// Check the index has no more keys than non-corrupted ones.
|
||||
// i.e: all but the last one.
|
||||
numKeys := 0
|
||||
for range db.Keys() {
|
||||
numKeys++
|
||||
}
|
||||
if !autoRecovery {
|
||||
// We are opening without autorepair, and thus are
|
||||
// in a corrupted state. The index isn't coherent with
|
||||
// the datafile.
|
||||
require.Equal(n, numKeys)
|
||||
return
|
||||
}
|
||||
|
||||
require.Equal(n-1, numKeys, "The index should have n-1 keys")
|
||||
|
||||
// Double-check explicitly the corrupted one isn't here.
|
||||
// This check is redundant considering the last two checks,
|
||||
// but doesn't hurt.
|
||||
corrKey, _ := makeKeyVal(9)
|
||||
_, err = db.Get(corrKey)
|
||||
require.Equal(ErrKeyNotFound, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReIndex(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
|
@ -12,6 +12,7 @@ type Config struct {
|
||||
MaxKeySize uint32 `json:"max_key_size"`
|
||||
MaxValueSize uint64 `json:"max_value_size"`
|
||||
Sync bool `json:"sync"`
|
||||
AutoRecovery bool `json:"autorecovery"`
|
||||
}
|
||||
|
||||
// Load loads a configuration from the given path
|
||||
|
89
internal/data/recover.go
Normal file
89
internal/data/recover.go
Normal file
@ -0,0 +1,89 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/prologic/bitcask/internal"
|
||||
"github.com/prologic/bitcask/internal/config"
|
||||
"github.com/prologic/bitcask/internal/data/codec"
|
||||
)
|
||||
|
||||
// CheckAndRecover checks and recovers the last datafile.
|
||||
// If the datafile isn't corrupted, this is a noop. If it is,
|
||||
// the longest non-corrupted prefix will be kept and the rest
|
||||
// will be *deleted*. Also, the index file is also *deleted* which
|
||||
// will be automatically recreated on next startup.
|
||||
func CheckAndRecover(path string, cfg *config.Config) error {
|
||||
dfs, err := internal.GetDatafiles(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("scanning datafiles: %s", err)
|
||||
}
|
||||
if len(dfs) == 0 {
|
||||
return nil
|
||||
}
|
||||
f := dfs[len(dfs)-1]
|
||||
recovered, err := recoverDatafile(f, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("recovering data file")
|
||||
}
|
||||
if recovered {
|
||||
if err := os.Remove(filepath.Join(path, "index")); err != nil {
|
||||
return fmt.Errorf("error deleting the index on recovery: %s", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func recoverDatafile(path string, cfg *config.Config) (recovered bool, err error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("opening the datafile: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
err = f.Close()
|
||||
}()
|
||||
_, file := filepath.Split(path)
|
||||
rPath := fmt.Sprintf("%s.recovered", file)
|
||||
fr, err := os.OpenFile(rPath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("creating the recovered datafile: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
err = fr.Close()
|
||||
}()
|
||||
|
||||
dec := codec.NewDecoder(f, cfg.MaxKeySize, cfg.MaxValueSize)
|
||||
enc := codec.NewEncoder(fr)
|
||||
e := internal.Entry{}
|
||||
|
||||
corrupted := false
|
||||
for !corrupted {
|
||||
_, err = dec.Decode(&e)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if codec.IsCorruptedData(err) {
|
||||
corrupted = true
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("unexpected error while reading datafile: %w", err)
|
||||
}
|
||||
if _, err := enc.Encode(e); err != nil {
|
||||
return false, fmt.Errorf("writing to recovered datafile: %w", err)
|
||||
}
|
||||
}
|
||||
if !corrupted {
|
||||
if err := os.Remove(fr.Name()); err != nil {
|
||||
return false, fmt.Errorf("can't remove temporal recovered datafile: %w", err)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
if err := os.Rename(rPath, path); err != nil {
|
||||
return false, fmt.Errorf("removing corrupted file: %s", err)
|
||||
}
|
||||
return true, nil
|
||||
}
|
@ -33,7 +33,8 @@ func DirSize(path string) (int64, error) {
|
||||
|
||||
// GetDatafiles returns a list of all data files stored in the database path
|
||||
// given by `path`. All datafiles are identified by the the glob `*.data` and
|
||||
// the basename is represented by an monotomic increasing integer.
|
||||
// the basename is represented by a monotonic increasing integer.
|
||||
// The returned files are *sorted* in increasing order.
|
||||
func GetDatafiles(path string) ([]string, error) {
|
||||
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
|
||||
if err != nil {
|
||||
|
12
options.go
12
options.go
@ -14,11 +14,23 @@ const (
|
||||
|
||||
// DefaultSync is the default file synchronization action
|
||||
DefaultSync = false
|
||||
|
||||
// DefaultAutoRecovery is the default auto-recovery action.
|
||||
)
|
||||
|
||||
// Option is a function that takes a config struct and modifies it
|
||||
type Option func(*config.Config) error
|
||||
|
||||
// WithAutoRecovery sets auto recovery of data and index file recreation.
|
||||
// IMPORTANT: This flag MUST BE used only if a proper backup was made of all
|
||||
// the existing datafiles.
|
||||
func WithAutoRecovery(enabled bool) Option {
|
||||
return func(cfg *config.Config) error {
|
||||
cfg.AutoRecovery = enabled
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxDatafileSize sets the maximum datafile size option
|
||||
func WithMaxDatafileSize(size int) Option {
|
||||
return func(cfg *config.Config) error {
|
||||
|
Loading…
x
Reference in New Issue
Block a user