1
0
mirror of https://github.com/taigrr/bitcask synced 2025-01-18 04:03:17 -08:00

Replace keydir with ART trie (#75)

* Replace keydir with ART trie

* Address some review feedback

* Address review feedback (consts)
This commit is contained in:
James Mills 2019-09-02 08:38:56 +10:00 committed by GitHub
parent 36bc134b22
commit abbbeb8e1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 201 additions and 228 deletions

View File

@ -1,7 +1,6 @@
package bitcask
import (
"bytes"
"encoding/json"
"errors"
"hash/crc32"
@ -13,8 +12,7 @@ import (
"sync"
"github.com/gofrs/flock"
"github.com/plar/go-adaptive-radix-tree"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
@ -51,7 +49,6 @@ type Bitcask struct {
options []Option
path string
curr *internal.Datafile
keydir *internal.Keydir
datafiles map[int]*internal.Datafile
trie art.Tree
}
@ -74,7 +71,9 @@ func (b *Bitcask) Stats() (stats Stats, err error) {
}
stats.Datafiles = len(b.datafiles)
stats.Keys = b.keydir.Len()
b.mu.RLock()
stats.Keys = b.trie.Size()
b.mu.RUnlock()
stats.Size = size
return
@ -89,7 +88,16 @@ func (b *Bitcask) Close() error {
os.Remove(b.Flock.Path())
}()
if err := b.keydir.Save(path.Join(b.path, "index")); err != nil {
f, err := os.OpenFile(filepath.Join(b.path, "index"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
defer f.Close()
if err := internal.WriteIndex(b.trie, f); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
@ -112,11 +120,15 @@ func (b *Bitcask) Sync() error {
func (b *Bitcask) Get(key []byte) ([]byte, error) {
var df *internal.Datafile
item, ok := b.keydir.Get(key)
if !ok {
b.mu.RLock()
value, found := b.trie.Search(key)
b.mu.RUnlock()
if !found {
return nil, ErrKeyNotFound
}
item := value.(internal.Item)
if item.FileID == b.curr.FileID() {
df = b.curr
} else {
@ -138,8 +150,10 @@ func (b *Bitcask) Get(key []byte) ([]byte, error) {
// Has returns true if the key exists in the database, false otherwise.
func (b *Bitcask) Has(key []byte) bool {
_, ok := b.keydir.Get(key)
return ok
b.mu.RLock()
_, found := b.trie.Search(key)
b.mu.RUnlock()
return found
}
// Put stores the key and value in the database.
@ -162,11 +176,10 @@ func (b *Bitcask) Put(key, value []byte) error {
}
}
item := b.keydir.Add(key, b.curr.FileID(), offset, n)
if b.config.greedyScan {
item := internal.Item{b.curr.FileID(), offset, n}
b.mu.Lock()
b.trie.Insert(key, item)
}
b.mu.Unlock()
return nil
}
@ -179,11 +192,9 @@ func (b *Bitcask) Delete(key []byte) error {
return err
}
b.keydir.Delete(key)
if b.config.greedyScan {
b.mu.Lock()
b.trie.Delete(key)
}
b.mu.Unlock()
return nil
}
@ -191,48 +202,65 @@ func (b *Bitcask) Delete(key []byte) error {
// Scan performs a prefix scan of keys matching the given prefix and calling
// the function `f` with the keys found. If the function returns an error
// no further keys are processed and the first error returned.
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) error {
if b.config.greedyScan {
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
if err := f(node.Key()); err != nil {
// Skip the root node
if len(node.Key()) == 0 {
return true
}
if err = f(node.Key()); err != nil {
return false
}
return true
})
return nil
}
keys := b.Keys()
for key := range keys {
if bytes.Equal(prefix, key[:len(prefix)]) {
if err := f([]byte(key)); err != nil {
return err
}
}
}
return nil
return
}
// Len returns the total number of keys in the database
func (b *Bitcask) Len() int {
return b.keydir.Len()
b.mu.RLock()
defer b.mu.RUnlock()
return b.trie.Size()
}
// Keys returns all keys in the database as a channel of string(s)
// Keys returns all keys in the database as a channel of keys
func (b *Bitcask) Keys() chan []byte {
return b.keydir.Keys()
ch := make(chan []byte)
go func() {
b.mu.RLock()
defer b.mu.RUnlock()
for it := b.trie.Iterator(); it.HasNext(); {
node, _ := it.Next()
// Skip the root node
if len(node.Key()) == 0 {
continue
}
ch <- node.Key()
}
close(ch)
}()
return ch
}
// Fold iterates over all keys in the database calling the function `f` for
// each key. If the function returns an error, no further keys are processed
// and the error returned.
func (b *Bitcask) Fold(f func(key []byte) error) error {
for key := range b.keydir.Keys() {
if err := f(key); err != nil {
return err
}
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEach(func(node art.Node) bool {
if err := f(node.Key()); err != nil {
return false
}
return true
})
return nil
}
@ -314,22 +342,17 @@ func (b *Bitcask) reopen() error {
datafiles[id] = df
}
keydir := internal.NewKeydir()
var t art.Tree
if b.config.greedyScan {
t = art.New()
}
t := art.New()
if internal.Exists(path.Join(b.path, "index")) {
if err := keydir.Load(path.Join(b.path, "index")); err != nil {
f, err := os.Open(path.Join(b.path, "index"))
if err != nil {
return err
}
for key := range keydir.Keys() {
item, _ := keydir.Get(key)
if b.config.greedyScan {
t.Insert(key, item)
}
defer f.Close()
if err := internal.ReadIndex(f, t); err != nil {
return err
}
} else {
for i, df := range datafiles {
@ -344,17 +367,15 @@ func (b *Bitcask) reopen() error {
// Tombstone value (deleted key)
if len(e.Value) == 0 {
keydir.Delete(e.Key)
t.Delete(e.Key)
continue
}
item := keydir.Add(e.Key, ids[i], e.Offset, n)
if b.config.greedyScan {
item := internal.Item{ids[i], e.Offset, n}
t.Insert(e.Key, item)
}
}
}
}
var id int
if len(ids) > 0 {
@ -366,15 +387,10 @@ func (b *Bitcask) reopen() error {
return err
}
b.trie = t
b.curr = curr
b.datafiles = datafiles
b.keydir = keydir
if b.config.greedyScan {
b.trie = t
}
return nil
}

1
go.mod
View File

@ -3,7 +3,6 @@ module github.com/prologic/bitcask
go 1.12
require (
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d
github.com/gofrs/flock v0.7.1
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/magiconair/properties v1.8.1 // indirect

2
go.sum
View File

@ -20,8 +20,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d h1:TocZO8frNoxkwqFPePHFldSw8vLu+gBrlvFZYWqxiF4=
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d/go.mod h1:D6ICZm05D9VN1n/8iOtBxLpXtoGp6HDFUJ1RNVieOSE=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=

110
internal/codec_index.go Normal file
View File

@ -0,0 +1,110 @@
package internal
import (
"encoding/binary"
"io"
art "github.com/plar/go-adaptive-radix-tree"
)
const (
Int32Size = 4
Int64Size = 8
FileIDSize = Int32Size
OffsetSize = Int64Size
SizeSize = Int64Size
)
func ReadBytes(r io.Reader) ([]byte, error) {
s := make([]byte, Int32Size)
_, err := io.ReadFull(r, s)
if err != nil {
return nil, err
}
size := binary.BigEndian.Uint32(s)
b := make([]byte, size)
_, err = io.ReadFull(r, b)
if err != nil {
return nil, err
}
return b, nil
}
func WriteBytes(b []byte, w io.Writer) (int, error) {
s := make([]byte, Int32Size)
binary.BigEndian.PutUint32(s, uint32(len(b)))
n, err := w.Write(s)
if err != nil {
return n, err
}
m, err := w.Write(b)
if err != nil {
return (n + m), err
}
return (n + m), nil
}
func ReadItem(r io.Reader) (Item, error) {
buf := make([]byte, (FileIDSize + OffsetSize + SizeSize))
_, err := io.ReadFull(r, buf)
if err != nil {
return Item{}, err
}
return Item{
FileID: int(binary.BigEndian.Uint32(buf[:FileIDSize])),
Offset: int64(binary.BigEndian.Uint64(buf[FileIDSize:(FileIDSize + OffsetSize)])),
Size: int64(binary.BigEndian.Uint64(buf[(FileIDSize + OffsetSize):])),
}, nil
}
func WriteItem(item Item, w io.Writer) (int, error) {
buf := make([]byte, (FileIDSize + OffsetSize + SizeSize))
binary.BigEndian.PutUint32(buf[:FileIDSize], uint32(item.FileID))
binary.BigEndian.PutUint64(buf[FileIDSize:(FileIDSize+OffsetSize)], uint64(item.Offset))
binary.BigEndian.PutUint64(buf[(FileIDSize+OffsetSize):], uint64(item.Size))
n, err := w.Write(buf)
if err != nil {
return 0, err
}
return n, nil
}
func ReadIndex(r io.Reader, t art.Tree) error {
for {
key, err := ReadBytes(r)
if err != nil {
if err == io.EOF {
break
}
return err
}
item, err := ReadItem(r)
if err != nil {
return err
}
t.Insert(key, item)
}
return nil
}
func WriteIndex(t art.Tree, w io.Writer) (err error) {
t.ForEach(func(node art.Node) bool {
_, err = WriteBytes(node.Key(), w)
if err != nil {
return false
}
item := node.Value().(Item)
_, err := WriteItem(item, w)
if err != nil {
return false
}
return true
})
return
}

7
internal/item.go Normal file
View File

@ -0,0 +1,7 @@
package internal
type Item struct {
FileID int `json:"fileid"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
}

View File

@ -1,129 +0,0 @@
package internal
import (
"bytes"
"encoding/gob"
"io"
"io/ioutil"
"os"
"sync"
)
type Item struct {
FileID int
Offset int64
Size int64
}
type Keydir struct {
sync.RWMutex
keys map[uint64][]byte
items map[uint64]Item
}
func NewKeydir() *Keydir {
return &Keydir{
keys: make(map[uint64][]byte),
items: make(map[uint64]Item),
}
}
func (k *Keydir) Add(key []byte, fileid int, offset, size int64) Item {
item := Item{
FileID: fileid,
Offset: offset,
Size: size,
}
hash := Hash(key)
k.Lock()
k.keys[hash] = key
k.items[hash] = item
k.Unlock()
return item
}
func (k *Keydir) Get(key []byte) (Item, bool) {
k.RLock()
item, ok := k.items[Hash(key)]
k.RUnlock()
return item, ok
}
func (k *Keydir) Delete(key []byte) {
hash := Hash(key)
k.Lock()
delete(k.keys, hash)
delete(k.items, hash)
k.Unlock()
}
func (k *Keydir) Len() int {
return len(k.keys)
}
func (k *Keydir) Keys() chan []byte {
ch := make(chan []byte)
go func() {
k.RLock()
for _, key := range k.keys {
ch <- key
}
close(ch)
k.RUnlock()
}()
return ch
}
func (k *Keydir) Bytes() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(k.keys); err != nil {
return nil, err
}
if err := enc.Encode(k.items); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (k *Keydir) Load(fn string) error {
f, err := os.Open(fn)
if err != nil {
return err
}
defer f.Close()
dec := gob.NewDecoder(f)
if err := dec.Decode(&k.keys); err != nil {
return err
}
if err := dec.Decode(&k.items); err != nil {
return err
}
return nil
}
func (k *Keydir) Save(fn string) error {
data, err := k.Bytes()
if err != nil {
return err
}
return ioutil.WriteFile(fn, data, 0644)
}
func NewKeydirFromBytes(r io.Reader) (*Keydir, error) {
k := NewKeydir()
dec := gob.NewDecoder(r)
if err := dec.Decode(&k.keys); err != nil {
return nil, err
}
if err := dec.Decode(&k.items); err != nil {
return nil, err
}
return k, nil
}

View File

@ -9,20 +9,6 @@ import (
"strings"
)
const (
offset64 = 14695981039346656037
prime64 = 1099511628211
)
func Hash(key []byte) uint64 {
var s uint64 = offset64
for _, c := range key {
s ^= uint64(c)
s *= prime64
}
return s
}
func Exists(path string) bool {
_, err := os.Stat(path)
return err == nil

View File

@ -25,7 +25,6 @@ type config struct {
maxKeySize int
maxValueSize int
sync bool
greedyScan bool
}
func (c *config) MarshalJSON() ([]byte, error) {
@ -34,13 +33,11 @@ func (c *config) MarshalJSON() ([]byte, error) {
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
Sync bool `json:"sync"`
GreedyScan bool `json:"greedy_scan"`
}{
MaxDatafileSize: c.maxDatafileSize,
MaxKeySize: c.maxKeySize,
MaxValueSize: c.maxValueSize,
Sync: c.sync,
GreedyScan: c.greedyScan,
})
}
@ -50,7 +47,6 @@ func getConfig(path string) (*config, error) {
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
Sync bool `json:"sync"`
GreedyScan bool `json:"greedy_scan"`
}
var cfg Config
@ -69,7 +65,6 @@ func getConfig(path string) (*config, error) {
maxKeySize: cfg.MaxKeySize,
maxValueSize: cfg.MaxValueSize,
sync: cfg.Sync,
greedyScan: cfg.GreedyScan,
}, nil
}
@ -113,12 +108,3 @@ func WithSync(sync bool) Option {
return nil
}
}
// WithGreedyScan enables faster Scan performance.
// This is disabled by default because it causes high memory usage.
func WithGreedyScan(enabled bool) Option {
return func(cfg *config) error {
cfg.greedyScan = enabled
return nil
}
}