diff --git a/bitcask.go b/bitcask.go index c923f32..4fc721c 100644 --- a/bitcask.go +++ b/bitcask.go @@ -1,7 +1,6 @@ package bitcask import ( - "bytes" "encoding/json" "errors" "hash/crc32" @@ -13,8 +12,7 @@ import ( "sync" "github.com/gofrs/flock" - "github.com/plar/go-adaptive-radix-tree" - + art "github.com/plar/go-adaptive-radix-tree" "github.com/prologic/bitcask/internal" ) @@ -51,7 +49,6 @@ type Bitcask struct { options []Option path string curr *internal.Datafile - keydir *internal.Keydir datafiles map[int]*internal.Datafile trie art.Tree } @@ -74,7 +71,9 @@ func (b *Bitcask) Stats() (stats Stats, err error) { } stats.Datafiles = len(b.datafiles) - stats.Keys = b.keydir.Len() + b.mu.RLock() + stats.Keys = b.trie.Size() + b.mu.RUnlock() stats.Size = size return @@ -89,7 +88,16 @@ func (b *Bitcask) Close() error { os.Remove(b.Flock.Path()) }() - if err := b.keydir.Save(path.Join(b.path, "index")); err != nil { + f, err := os.OpenFile(filepath.Join(b.path, "index"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } + defer f.Close() + + if err := internal.WriteIndex(b.trie, f); err != nil { + return err + } + if err := f.Sync(); err != nil { return err } @@ -112,11 +120,15 @@ func (b *Bitcask) Sync() error { func (b *Bitcask) Get(key []byte) ([]byte, error) { var df *internal.Datafile - item, ok := b.keydir.Get(key) - if !ok { + b.mu.RLock() + value, found := b.trie.Search(key) + b.mu.RUnlock() + if !found { return nil, ErrKeyNotFound } + item := value.(internal.Item) + if item.FileID == b.curr.FileID() { df = b.curr } else { @@ -138,8 +150,10 @@ func (b *Bitcask) Get(key []byte) ([]byte, error) { // Has returns true if the key exists in the database, false otherwise. func (b *Bitcask) Has(key []byte) bool { - _, ok := b.keydir.Get(key) - return ok + b.mu.RLock() + _, found := b.trie.Search(key) + b.mu.RUnlock() + return found } // Put stores the key and value in the database. @@ -162,11 +176,10 @@ func (b *Bitcask) Put(key, value []byte) error { } } - item := b.keydir.Add(key, b.curr.FileID(), offset, n) - - if b.config.greedyScan { - b.trie.Insert(key, item) - } + item := internal.Item{b.curr.FileID(), offset, n} + b.mu.Lock() + b.trie.Insert(key, item) + b.mu.Unlock() return nil } @@ -179,11 +192,9 @@ func (b *Bitcask) Delete(key []byte) error { return err } - b.keydir.Delete(key) - - if b.config.greedyScan { - b.trie.Delete(key) - } + b.mu.Lock() + b.trie.Delete(key) + b.mu.Unlock() return nil } @@ -191,48 +202,65 @@ func (b *Bitcask) Delete(key []byte) error { // Scan performs a prefix scan of keys matching the given prefix and calling // the function `f` with the keys found. If the function returns an error // no further keys are processed and the first error returned. -func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) error { - if b.config.greedyScan { - b.trie.ForEachPrefix(prefix, func(node art.Node) bool { - if err := f(node.Key()); err != nil { - return false - } +func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) { + b.trie.ForEachPrefix(prefix, func(node art.Node) bool { + // Skip the root node + if len(node.Key()) == 0 { return true - }) - return nil - } - - keys := b.Keys() - for key := range keys { - if bytes.Equal(prefix, key[:len(prefix)]) { - if err := f([]byte(key)); err != nil { - return err - } } - } - return nil + if err = f(node.Key()); err != nil { + return false + } + return true + }) + return } // Len returns the total number of keys in the database func (b *Bitcask) Len() int { - return b.keydir.Len() + b.mu.RLock() + defer b.mu.RUnlock() + return b.trie.Size() } -// Keys returns all keys in the database as a channel of string(s) +// Keys returns all keys in the database as a channel of keys func (b *Bitcask) Keys() chan []byte { - return b.keydir.Keys() + ch := make(chan []byte) + go func() { + b.mu.RLock() + defer b.mu.RUnlock() + + for it := b.trie.Iterator(); it.HasNext(); { + node, _ := it.Next() + + // Skip the root node + if len(node.Key()) == 0 { + continue + } + + ch <- node.Key() + } + close(ch) + }() + + return ch } // Fold iterates over all keys in the database calling the function `f` for // each key. If the function returns an error, no further keys are processed // and the error returned. func (b *Bitcask) Fold(f func(key []byte) error) error { - for key := range b.keydir.Keys() { - if err := f(key); err != nil { - return err + b.mu.RLock() + defer b.mu.RUnlock() + + b.trie.ForEach(func(node art.Node) bool { + if err := f(node.Key()); err != nil { + return false } - } + return true + }) + return nil } @@ -314,22 +342,17 @@ func (b *Bitcask) reopen() error { datafiles[id] = df } - keydir := internal.NewKeydir() - - var t art.Tree - if b.config.greedyScan { - t = art.New() - } + t := art.New() if internal.Exists(path.Join(b.path, "index")) { - if err := keydir.Load(path.Join(b.path, "index")); err != nil { + f, err := os.Open(path.Join(b.path, "index")) + if err != nil { return err } - for key := range keydir.Keys() { - item, _ := keydir.Get(key) - if b.config.greedyScan { - t.Insert(key, item) - } + defer f.Close() + + if err := internal.ReadIndex(f, t); err != nil { + return err } } else { for i, df := range datafiles { @@ -344,14 +367,12 @@ func (b *Bitcask) reopen() error { // Tombstone value (deleted key) if len(e.Value) == 0 { - keydir.Delete(e.Key) + t.Delete(e.Key) continue } - item := keydir.Add(e.Key, ids[i], e.Offset, n) - if b.config.greedyScan { - t.Insert(e.Key, item) - } + item := internal.Item{ids[i], e.Offset, n} + t.Insert(e.Key, item) } } } @@ -366,15 +387,10 @@ func (b *Bitcask) reopen() error { return err } + b.trie = t b.curr = curr b.datafiles = datafiles - b.keydir = keydir - - if b.config.greedyScan { - b.trie = t - } - return nil } diff --git a/go.mod b/go.mod index 6fa6e77..5927ab5 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/prologic/bitcask go 1.12 require ( - github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d github.com/gofrs/flock v0.7.1 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/magiconair/properties v1.8.1 // indirect diff --git a/go.sum b/go.sum index 869781d..6747d24 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d h1:TocZO8frNoxkwqFPePHFldSw8vLu+gBrlvFZYWqxiF4= -github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d/go.mod h1:D6ICZm05D9VN1n/8iOtBxLpXtoGp6HDFUJ1RNVieOSE= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= diff --git a/internal/codec_index.go b/internal/codec_index.go new file mode 100644 index 0000000..d639d02 --- /dev/null +++ b/internal/codec_index.go @@ -0,0 +1,110 @@ +package internal + +import ( + "encoding/binary" + "io" + + art "github.com/plar/go-adaptive-radix-tree" +) + +const ( + Int32Size = 4 + Int64Size = 8 + FileIDSize = Int32Size + OffsetSize = Int64Size + SizeSize = Int64Size +) + +func ReadBytes(r io.Reader) ([]byte, error) { + s := make([]byte, Int32Size) + _, err := io.ReadFull(r, s) + if err != nil { + return nil, err + } + size := binary.BigEndian.Uint32(s) + b := make([]byte, size) + _, err = io.ReadFull(r, b) + if err != nil { + return nil, err + } + return b, nil +} + +func WriteBytes(b []byte, w io.Writer) (int, error) { + s := make([]byte, Int32Size) + binary.BigEndian.PutUint32(s, uint32(len(b))) + n, err := w.Write(s) + if err != nil { + return n, err + } + m, err := w.Write(b) + if err != nil { + return (n + m), err + } + return (n + m), nil +} + +func ReadItem(r io.Reader) (Item, error) { + buf := make([]byte, (FileIDSize + OffsetSize + SizeSize)) + _, err := io.ReadFull(r, buf) + if err != nil { + return Item{}, err + } + + return Item{ + FileID: int(binary.BigEndian.Uint32(buf[:FileIDSize])), + Offset: int64(binary.BigEndian.Uint64(buf[FileIDSize:(FileIDSize + OffsetSize)])), + Size: int64(binary.BigEndian.Uint64(buf[(FileIDSize + OffsetSize):])), + }, nil +} + +func WriteItem(item Item, w io.Writer) (int, error) { + buf := make([]byte, (FileIDSize + OffsetSize + SizeSize)) + binary.BigEndian.PutUint32(buf[:FileIDSize], uint32(item.FileID)) + binary.BigEndian.PutUint64(buf[FileIDSize:(FileIDSize+OffsetSize)], uint64(item.Offset)) + binary.BigEndian.PutUint64(buf[(FileIDSize+OffsetSize):], uint64(item.Size)) + n, err := w.Write(buf) + if err != nil { + return 0, err + } + return n, nil +} + +func ReadIndex(r io.Reader, t art.Tree) error { + for { + key, err := ReadBytes(r) + if err != nil { + if err == io.EOF { + break + } + return err + } + + item, err := ReadItem(r) + if err != nil { + return err + } + + t.Insert(key, item) + } + + return nil +} + +func WriteIndex(t art.Tree, w io.Writer) (err error) { + t.ForEach(func(node art.Node) bool { + _, err = WriteBytes(node.Key(), w) + if err != nil { + return false + } + + item := node.Value().(Item) + _, err := WriteItem(item, w) + if err != nil { + return false + } + + return true + }) + return +} diff --git a/internal/item.go b/internal/item.go new file mode 100644 index 0000000..61bbabf --- /dev/null +++ b/internal/item.go @@ -0,0 +1,7 @@ +package internal + +type Item struct { + FileID int `json:"fileid"` + Offset int64 `json:"offset"` + Size int64 `json:"size"` +} diff --git a/internal/keydir.go b/internal/keydir.go deleted file mode 100644 index ae9d070..0000000 --- a/internal/keydir.go +++ /dev/null @@ -1,129 +0,0 @@ -package internal - -import ( - "bytes" - "encoding/gob" - "io" - "io/ioutil" - "os" - "sync" -) - -type Item struct { - FileID int - Offset int64 - Size int64 -} - -type Keydir struct { - sync.RWMutex - keys map[uint64][]byte - items map[uint64]Item -} - -func NewKeydir() *Keydir { - return &Keydir{ - keys: make(map[uint64][]byte), - items: make(map[uint64]Item), - } -} - -func (k *Keydir) Add(key []byte, fileid int, offset, size int64) Item { - item := Item{ - FileID: fileid, - Offset: offset, - Size: size, - } - - hash := Hash(key) - - k.Lock() - k.keys[hash] = key - k.items[hash] = item - k.Unlock() - - return item -} - -func (k *Keydir) Get(key []byte) (Item, bool) { - k.RLock() - item, ok := k.items[Hash(key)] - k.RUnlock() - return item, ok -} - -func (k *Keydir) Delete(key []byte) { - hash := Hash(key) - k.Lock() - delete(k.keys, hash) - delete(k.items, hash) - k.Unlock() -} - -func (k *Keydir) Len() int { - return len(k.keys) -} - -func (k *Keydir) Keys() chan []byte { - ch := make(chan []byte) - go func() { - k.RLock() - for _, key := range k.keys { - ch <- key - } - close(ch) - k.RUnlock() - }() - return ch -} - -func (k *Keydir) Bytes() ([]byte, error) { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - if err := enc.Encode(k.keys); err != nil { - return nil, err - } - if err := enc.Encode(k.items); err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -func (k *Keydir) Load(fn string) error { - f, err := os.Open(fn) - if err != nil { - return err - } - defer f.Close() - - dec := gob.NewDecoder(f) - if err := dec.Decode(&k.keys); err != nil { - return err - } - if err := dec.Decode(&k.items); err != nil { - return err - } - - return nil -} - -func (k *Keydir) Save(fn string) error { - data, err := k.Bytes() - if err != nil { - return err - } - - return ioutil.WriteFile(fn, data, 0644) -} - -func NewKeydirFromBytes(r io.Reader) (*Keydir, error) { - k := NewKeydir() - dec := gob.NewDecoder(r) - if err := dec.Decode(&k.keys); err != nil { - return nil, err - } - if err := dec.Decode(&k.items); err != nil { - return nil, err - } - return k, nil -} diff --git a/internal/utils.go b/internal/utils.go index 07e814c..158c0d1 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -9,20 +9,6 @@ import ( "strings" ) -const ( - offset64 = 14695981039346656037 - prime64 = 1099511628211 -) - -func Hash(key []byte) uint64 { - var s uint64 = offset64 - for _, c := range key { - s ^= uint64(c) - s *= prime64 - } - return s -} - func Exists(path string) bool { _, err := os.Stat(path) return err == nil diff --git a/options.go b/options.go index 74770bb..edd7f67 100644 --- a/options.go +++ b/options.go @@ -25,7 +25,6 @@ type config struct { maxKeySize int maxValueSize int sync bool - greedyScan bool } func (c *config) MarshalJSON() ([]byte, error) { @@ -34,13 +33,11 @@ func (c *config) MarshalJSON() ([]byte, error) { MaxKeySize int `json:"max_key_size"` MaxValueSize int `json:"max_value_size"` Sync bool `json:"sync"` - GreedyScan bool `json:"greedy_scan"` }{ MaxDatafileSize: c.maxDatafileSize, MaxKeySize: c.maxKeySize, MaxValueSize: c.maxValueSize, Sync: c.sync, - GreedyScan: c.greedyScan, }) } @@ -50,7 +47,6 @@ func getConfig(path string) (*config, error) { MaxKeySize int `json:"max_key_size"` MaxValueSize int `json:"max_value_size"` Sync bool `json:"sync"` - GreedyScan bool `json:"greedy_scan"` } var cfg Config @@ -69,7 +65,6 @@ func getConfig(path string) (*config, error) { maxKeySize: cfg.MaxKeySize, maxValueSize: cfg.MaxValueSize, sync: cfg.Sync, - greedyScan: cfg.GreedyScan, }, nil } @@ -113,12 +108,3 @@ func WithSync(sync bool) Option { return nil } } - -// WithGreedyScan enables faster Scan performance. -// This is disabled by default because it causes high memory usage. -func WithGreedyScan(enabled bool) Option { - return func(cfg *config) error { - cfg.greedyScan = enabled - return nil - } -}