From 6fe6fe0689eba9d50f1ff043dd3e27cc439f71bd Mon Sep 17 00:00:00 2001 From: James Mills <1290234+prologic@users.noreply.github.com> Date: Sun, 17 Mar 2019 13:53:30 +1000 Subject: [PATCH] Refactored configuration option handling. Fixes #3 --- :wq | 374 +++++++++++++++++++++++++++++++++++++++++++++++++++++ bitcask.go | 18 +-- options.go | 29 +++-- 3 files changed, 400 insertions(+), 21 deletions(-) create mode 100644 :wq diff --git a/:wq b/:wq new file mode 100644 index 0000000..30b02d1 --- /dev/null +++ b/:wq @@ -0,0 +1,374 @@ +package bitcask + +import ( + "fmt" + "hash/crc32" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + + "github.com/gofrs/flock" + "github.com/prologic/trie" +) + +type Bitcask struct { + *flock.Flock + + config config + path string + curr *Datafile + keydir *Keydir + datafiles []*Datafile + trie *trie.Trie + + maxDatafileSize int64 +} + +func (b *Bitcask) Close() error { + defer func() { + b.Flock.Unlock() + os.Remove(b.Flock.Path()) + }() + + for _, df := range b.datafiles { + df.Close() + } + return b.curr.Close() +} + +func (b *Bitcask) Sync() error { + return b.curr.Sync() +} + +func (b *Bitcask) Get(key string) ([]byte, error) { + var df *Datafile + + item, ok := b.keydir.Get(key) + if !ok { + return nil, fmt.Errorf("error: key not found %s", key) + } + + if item.FileID == b.curr.id { + df = b.curr + } else { + df = b.datafiles[item.FileID] + } + + e, err := df.ReadAt(item.Offset) + if err != nil { + return nil, err + } + + checksum := crc32.ChecksumIEEE(e.Value) + if checksum != e.Checksum { + return nil, fmt.Errorf("error: checksum falied %s %d != %d", key, e.Checksum, checksum) + } + + return e.Value, nil +} + +func (b *Bitcask) Put(key string, value []byte) error { + if len(key) > b.config.MaxKeySize { + return fmt.Errorf("error: key too large %d > %d", len(key), b.config.MaxKeySize) + } + if len(value) > b.config.MaxValueSize { + return fmt.Errorf("error: value too large %d > %d", len(value), b.config.MaxValueSize) + } + + offset, err := b.put(key, value) + if err != nil { + return err + } + + item := b.keydir.Add(key, b.curr.id, offset) + b.trie.Add(key, item) + + return nil +} + +func (b *Bitcask) Delete(key string) error { + _, err := b.put(key, []byte{}) + if err != nil { + return err + } + + b.keydir.Delete(key) + b.trie.Remove(key) + + return nil +} + +func (b *Bitcask) Scan(prefix string, f func(key string) error) error { + keys := b.trie.PrefixSearch(prefix) + for _, key := range keys { + if err := f(key); err != nil { + return err + } + } + return nil +} + +func (b *Bitcask) Fold(f func(key string) error) error { + for key := range b.keydir.Keys() { + if err := f(key); err != nil { + return err + } + } + return nil +} + +func (b *Bitcask) put(key string, value []byte) (int64, error) { + size, err := b.curr.Size() + if err != nil { + return -1, err + } + + if size >= b.maxDatafileSize { + err := b.curr.Close() + if err != nil { + return -1, err + } + + df, err := NewDatafile(b.path, b.curr.id, true) + if err != nil { + return -1, err + } + + b.datafiles = append(b.datafiles, df) + + id := b.curr.id + 1 + curr, err := NewDatafile(b.path, id, false) + if err != nil { + return -1, err + } + b.curr = curr + } + + e := NewEntry(key, value) + return b.curr.Write(e) +} + +func (b *Bitcask) setMaxDatafileSize(size int64) error { + b.maxDatafileSize = size + return nil +} + +func Merge(path string, force bool) error { + fns, err := getDatafiles(path) + if err != nil { + return err + } + + ids, err := parseIds(fns) + if err != nil { + return err + } + + // Do not merge if we only have 1 Datafile + if len(ids) <= 1 { + return nil + } + + // Don't merge the Active Datafile (the last one) + fns = fns[:len(fns)-1] + ids = ids[:len(ids)-1] + + temp, err := ioutil.TempDir("", "bitcask") + if err != nil { + return err + } + + for i, fn := range fns { + // Don't merge Datafiles whose .hint files we've already generated + // (they are already merged); unless we set the force flag to true + // (forcing a re-merge). + if filepath.Ext(fn) == ".hint" && !force { + // Already merged + continue + } + + id := ids[i] + + keydir := NewKeydir() + + df, err := NewDatafile(path, id, true) + if err != nil { + return err + } + defer df.Close() + + for { + e, err := df.Read() + if err != nil { + if err == io.EOF { + break + } + return err + } + + // Tombstone value (deleted key) + if len(e.Value) == 0 { + keydir.Delete(e.Key) + continue + } + + keydir.Add(e.Key, ids[i], e.Offset) + } + + tempdf, err := NewDatafile(temp, id, false) + if err != nil { + return err + } + defer tempdf.Close() + + for key := range keydir.Keys() { + item, _ := keydir.Get(key) + e, err := df.ReadAt(item.Offset) + if err != nil { + return err + } + + _, err = tempdf.Write(e) + if err != nil { + return err + } + } + + err = tempdf.Close() + if err != nil { + return err + } + + err = df.Close() + if err != nil { + return err + } + + err = os.Rename(tempdf.Name(), df.Name()) + if err != nil { + return err + } + + hint := strings.TrimSuffix(df.Name(), ".data") + ".hint" + err = keydir.Save(hint) + if err != nil { + return err + } + } + + return nil +} + +func Open(path string, options ...option) (*Bitcask, error) { + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + + err := Merge(path, false) + if err != nil { + return nil, err + } + + fns, err := getDatafiles(path) + if err != nil { + return nil, err + } + + ids, err := parseIds(fns) + if err != nil { + return nil, err + } + + var datafiles []*Datafile + + keydir := NewKeydir() + trie := trie.New() + + for i, fn := range fns { + df, err := NewDatafile(path, ids[i], true) + if err != nil { + return nil, err + } + datafiles = append(datafiles, df) + + if filepath.Ext(fn) == ".hint" { + f, err := os.Open(filepath.Join(path, fn)) + if err != nil { + return nil, err + } + defer f.Close() + + hint, err := NewKeydirFromBytes(f) + if err != nil { + return nil, err + } + + for key := range hint.Keys() { + item, _ := hint.Get(key) + _ = keydir.Add(key, item.FileID, item.Offset) + trie.Add(key, item) + } + } else { + for { + e, err := df.Read() + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + + // Tombstone value (deleted key) + if len(e.Value) == 0 { + keydir.Delete(e.Key) + continue + } + + item := keydir.Add(e.Key, ids[i], e.Offset) + trie.Add(e.Key, item) + } + } + } + + var id int + if len(ids) > 0 { + id = ids[(len(ids) - 1)] + } + + curr, err := NewDatafile(path, id, false) + if err != nil { + return nil, err + } + + bitcask := &Bitcask{ + Flock: flock.New(filepath.Join(path, "lock")), + config: NewDefaultConfig(), + path: path, + curr: curr, + keydir: keydir, + datafiles: datafiles, + trie: trie, + + maxDatafileSize: DefaultMaxDatafileSize, + } + + for _, opt := range options { + err = opt(bitcask.config) + if err != nil { + return nil, err + } + } + + locked, err := bitcask.Flock.TryLock() + if err != nil { + return nil, err + } + + if !locked { + return nil, fmt.Errorf("error: database locked %s", path) + } + + return bitcask, nil +} diff --git a/bitcask.go b/bitcask.go index 1f6ba99..5f3b61d 100644 --- a/bitcask.go +++ b/bitcask.go @@ -16,7 +16,7 @@ import ( type Bitcask struct { *flock.Flock - opts Options + config *config path string curr *Datafile keydir *Keydir @@ -70,11 +70,11 @@ func (b *Bitcask) Get(key string) ([]byte, error) { } func (b *Bitcask) Put(key string, value []byte) error { - if len(key) > b.opts.MaxKeySize { - return fmt.Errorf("error: key too large %d > %d", len(key), b.opts.MaxKeySize) + if len(key) > b.config.MaxKeySize { + return fmt.Errorf("error: key too large %d > %d", len(key), b.config.MaxKeySize) } - if len(value) > b.opts.MaxValueSize { - return fmt.Errorf("error: value too large %d > %d", len(value), b.opts.MaxValueSize) + if len(value) > b.config.MaxValueSize { + return fmt.Errorf("error: value too large %d > %d", len(value), b.config.MaxValueSize) } offset, err := b.put(key, value) @@ -261,7 +261,7 @@ func Merge(path string, force bool) error { return nil } -func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { +func Open(path string, options ...option) (*Bitcask, error) { if err := os.MkdirAll(path, 0755); err != nil { return nil, err } @@ -344,7 +344,7 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { bitcask := &Bitcask{ Flock: flock.New(filepath.Join(path, "lock")), - opts: NewDefaultOptions(), + config: NewDefaultConfig(), path: path, curr: curr, keydir: keydir, @@ -354,8 +354,8 @@ func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { maxDatafileSize: DefaultMaxDatafileSize, } - for _, option := range options { - err = option(bitcask) + for _, opt := range options { + err = opt(bitcask.config) if err != nil { return nil, err } diff --git a/options.go b/options.go index 3279172..497c823 100644 --- a/options.go +++ b/options.go @@ -6,37 +6,42 @@ const ( DefaultMaxValueSize = 1 << 16 // 65KB ) -type Options struct { +// Option ... +type Option option + +type option func(*config) error + +type config struct { MaxDatafileSize int MaxKeySize int MaxValueSize int } -func NewDefaultOptions() Options { - return Options{ +func NewDefaultConfig() *config { + return &config{ MaxDatafileSize: DefaultMaxDatafileSize, MaxKeySize: DefaultMaxKeySize, MaxValueSize: DefaultMaxValueSize, } } -func WithMaxDatafileSize(size int) func(*Bitcask) error { - return func(b *Bitcask) error { - b.opts.MaxDatafileSize = size +func WithMaxDatafileSize(size int) option { + return func(cfg *config) error { + cfg.MaxDatafileSize = size return nil } } -func WithMaxKeySize(size int) func(*Bitcask) error { - return func(b *Bitcask) error { - b.opts.MaxKeySize = size +func WithMaxKeySize(size int) option { + return func(cfg *config) error { + cfg.MaxKeySize = size return nil } } -func WithMaxValueSize(size int) func(*Bitcask) error { - return func(b *Bitcask) error { - b.opts.MaxValueSize = size +func WithMaxValueSize(size int) option { + return func(cfg *config) error { + cfg.MaxValueSize = size return nil } }