codebeat: Code quality improvement (#103)

* codebeat: improve & bugfix

* codebeat: refactor to improve readability

* bugfix

* bugfix

* internal/data/codec: improve code coverage
This commit is contained in:
Ignacio Hagopian
2019-09-23 18:19:07 -03:00
committed by James Mills
parent 42c2b810bf
commit 498ea4069c
3 changed files with 105 additions and 67 deletions

View File

@@ -284,61 +284,17 @@ func (b *Bitcask) reopen() error {
b.mu.Lock()
defer b.mu.Unlock()
fns, err := internal.GetDatafiles(b.path)
datafiles, lastID, err := loadDatafiles(b.path, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return err
}
ids, err := internal.ParseIds(fns)
t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles)
if err != nil {
return err
}
datafiles := make(map[int]data.Datafile, len(ids))
for _, id := range ids {
df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return err
}
datafiles[id] = df
}
t, found, err := b.indexer.Load(filepath.Join(b.path, "index"), b.config.MaxKeySize)
if err != nil {
return err
}
if !found {
for i, df := range datafiles {
var offset int64
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
t.Delete(e.Key)
offset += n
continue
}
item := internal.Item{FileID: ids[i], Offset: offset, Size: n}
t.Insert(e.Key, item)
offset += n
}
}
}
var id int
if len(ids) > 0 {
id = ids[(len(ids) - 1)]
}
curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize)
curr, err := data.NewDatafile(b.path, lastID, false, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return err
}
@@ -486,3 +442,60 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return bitcask, nil
}
func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64) (datafiles map[int]data.Datafile, lastID int, err error) {
fns, err := internal.GetDatafiles(path)
if err != nil {
return nil, 0, err
}
ids, err := internal.ParseIds(fns)
if err != nil {
return nil, 0, err
}
datafiles = make(map[int]data.Datafile, len(ids))
for _, id := range ids {
datafiles[id], err = data.NewDatafile(path, id, true, maxKeySize, maxValueSize)
if err != nil {
return
}
}
if len(ids) > 0 {
lastID = ids[len(ids)-1]
}
return
}
func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile) (art.Tree, error) {
t, found, err := indexer.Load(filepath.Join(path, "index"), maxKeySize)
if err != nil {
return nil, err
}
if !found {
for _, df := range datafiles {
var offset int64
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
t.Delete(e.Key)
offset += n
continue
}
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
t.Insert(e.Key, item)
offset += n
}
}
}
return t, nil
}