mirror of
https://github.com/taigrr/bitcask
synced 2025-01-18 04:03:17 -08:00
* Add configuration options for FileMode Add two additional configuration values, and their corresponding default values: * DirFileModeBeforeUmask - Dir FileMode is used on all directories created. DefaultDirFileModeBeforeUmask is 0700. * FileFileModeBeforeUmask - File FileMode is used on all files created, except for the "lock" file (managed by the Flock library). DefaultFileFileModeBeforeUmask is 0600. When using these bits of configuration, keep in mind these FileMode values are set BEFORE any umask rules are applied. For example, if the user's umask is 022, setting DirFileFileModeBeforeUmask to 777 will result in directories with FileMode set to 755 (this umask prevents the write bit from being applied to group and world permissions). * moving defer statements after checking for errors use os.ModePerm const instead of os.FileMode(777) * fix spelling/grammar * skip these tests for Windows as they appear to break - Windows is less POSIX-y than it claims * ignore "lock" file for default case too -- this was incorrectly passing before including this, as my local dev station has umask 022
542 lines
12 KiB
Go
542 lines
12 KiB
Go
package bitcask
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"hash/crc32"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/gofrs/flock"
|
|
art "github.com/plar/go-adaptive-radix-tree"
|
|
"github.com/prologic/bitcask/internal"
|
|
"github.com/prologic/bitcask/internal/config"
|
|
"github.com/prologic/bitcask/internal/data"
|
|
"github.com/prologic/bitcask/internal/index"
|
|
)
|
|
|
|
var (
|
|
// ErrKeyNotFound is the error returned when a key is not found
|
|
ErrKeyNotFound = errors.New("error: key not found")
|
|
|
|
// ErrKeyTooLarge is the error returned for a key that exceeds the
|
|
// maximum allowed key size (configured with WithMaxKeySize).
|
|
ErrKeyTooLarge = errors.New("error: key too large")
|
|
|
|
// ErrEmptyKey is the error returned for a value with an empty key.
|
|
ErrEmptyKey = errors.New("error: empty key")
|
|
|
|
// ErrValueTooLarge is the error returned for a value that exceeds the
|
|
// maximum allowed value size (configured with WithMaxValueSize).
|
|
ErrValueTooLarge = errors.New("error: value too large")
|
|
|
|
// ErrChecksumFailed is the error returned if a key/value retrieved does
|
|
// not match its CRC checksum
|
|
ErrChecksumFailed = errors.New("error: checksum failed")
|
|
|
|
// ErrDatabaseLocked is the error returned if the database is locked
|
|
// (typically opened by another process)
|
|
ErrDatabaseLocked = errors.New("error: database locked")
|
|
)
|
|
|
|
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
|
|
// and in-memory hash of key/value pairs as per the Bitcask paper and seen
|
|
// in the Riak database.
|
|
type Bitcask struct {
|
|
mu sync.RWMutex
|
|
|
|
*flock.Flock
|
|
|
|
config *config.Config
|
|
options []Option
|
|
path string
|
|
curr data.Datafile
|
|
datafiles map[int]data.Datafile
|
|
trie art.Tree
|
|
indexer index.Indexer
|
|
}
|
|
|
|
// Stats is a struct returned by Stats() on an open Bitcask instance
|
|
type Stats struct {
|
|
Datafiles int
|
|
Keys int
|
|
Size int64
|
|
}
|
|
|
|
// Stats returns statistics about the database including the number of
|
|
// data files, keys and overall size on disk of the data
|
|
func (b *Bitcask) Stats() (stats Stats, err error) {
|
|
if stats.Size, err = internal.DirSize(b.path); err != nil {
|
|
return
|
|
}
|
|
|
|
b.mu.RLock()
|
|
stats.Datafiles = len(b.datafiles)
|
|
stats.Keys = b.trie.Size()
|
|
b.mu.RUnlock()
|
|
|
|
return
|
|
}
|
|
|
|
// Close closes the database and removes the lock. It is important to call
|
|
// Close() as this is the only way to cleanup the lock held by the open
|
|
// database.
|
|
func (b *Bitcask) Close() error {
|
|
defer func() {
|
|
b.Flock.Unlock()
|
|
os.Remove(b.Flock.Path())
|
|
}()
|
|
|
|
if err := b.indexer.Save(b.trie, filepath.Join(b.path, "index")); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, df := range b.datafiles {
|
|
if err := df.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return b.curr.Close()
|
|
}
|
|
|
|
// Sync flushes all buffers to disk ensuring all data is written
|
|
func (b *Bitcask) Sync() error {
|
|
return b.curr.Sync()
|
|
}
|
|
|
|
// Get retrieves the value of the given key. If the key is not found or an/I/O
|
|
// error occurs a null byte slice is returned along with the error.
|
|
func (b *Bitcask) Get(key []byte) ([]byte, error) {
|
|
var df data.Datafile
|
|
|
|
b.mu.RLock()
|
|
value, found := b.trie.Search(key)
|
|
if !found {
|
|
b.mu.RUnlock()
|
|
return nil, ErrKeyNotFound
|
|
}
|
|
|
|
item := value.(internal.Item)
|
|
|
|
if item.FileID == b.curr.FileID() {
|
|
df = b.curr
|
|
} else {
|
|
df = b.datafiles[item.FileID]
|
|
}
|
|
|
|
e, err := df.ReadAt(item.Offset, item.Size)
|
|
b.mu.RUnlock()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
checksum := crc32.ChecksumIEEE(e.Value)
|
|
if checksum != e.Checksum {
|
|
return nil, ErrChecksumFailed
|
|
}
|
|
|
|
return e.Value, nil
|
|
}
|
|
|
|
// Has returns true if the key exists in the database, false otherwise.
|
|
func (b *Bitcask) Has(key []byte) bool {
|
|
b.mu.RLock()
|
|
_, found := b.trie.Search(key)
|
|
b.mu.RUnlock()
|
|
return found
|
|
}
|
|
|
|
// Put stores the key and value in the database.
|
|
func (b *Bitcask) Put(key, value []byte) error {
|
|
if len(key) == 0 {
|
|
return ErrEmptyKey
|
|
}
|
|
if uint32(len(key)) > b.config.MaxKeySize {
|
|
return ErrKeyTooLarge
|
|
}
|
|
if uint64(len(value)) > b.config.MaxValueSize {
|
|
return ErrValueTooLarge
|
|
}
|
|
|
|
b.mu.Lock()
|
|
offset, n, err := b.put(key, value)
|
|
if err != nil {
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
|
|
if b.config.Sync {
|
|
if err := b.curr.Sync(); err != nil {
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
}
|
|
|
|
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
|
|
b.trie.Insert(key, item)
|
|
b.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Delete deletes the named key. If the key doesn't exist or an I/O error
|
|
// occurs the error is returned.
|
|
func (b *Bitcask) Delete(key []byte) error {
|
|
b.mu.Lock()
|
|
_, _, err := b.put(key, []byte{})
|
|
if err != nil {
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
b.trie.Delete(key)
|
|
b.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteAll deletes all the keys. If an I/O error occurs the error is returned.
|
|
func (b *Bitcask) DeleteAll() (err error) {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
b.trie.ForEach(func(node art.Node) bool {
|
|
_, _, err = b.put(node.Key(), []byte{})
|
|
return err == nil
|
|
})
|
|
b.trie = art.New()
|
|
|
|
return
|
|
}
|
|
|
|
// Scan performs a prefix scan of keys matching the given prefix and calling
|
|
// the function `f` with the keys found. If the function returns an error
|
|
// no further keys are processed and the first error returned.
|
|
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
|
|
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
|
|
// Skip the root node
|
|
if len(node.Key()) == 0 {
|
|
return true
|
|
}
|
|
|
|
if err = f(node.Key()); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
return
|
|
}
|
|
|
|
// Len returns the total number of keys in the database
|
|
func (b *Bitcask) Len() int {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
return b.trie.Size()
|
|
}
|
|
|
|
// Keys returns all keys in the database as a channel of keys
|
|
func (b *Bitcask) Keys() chan []byte {
|
|
ch := make(chan []byte)
|
|
go func() {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
for it := b.trie.Iterator(); it.HasNext(); {
|
|
node, _ := it.Next()
|
|
ch <- node.Key()
|
|
}
|
|
close(ch)
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
// Fold iterates over all keys in the database calling the function `f` for
|
|
// each key. If the function returns an error, no further keys are processed
|
|
// and the error returned.
|
|
func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
b.trie.ForEach(func(node art.Node) bool {
|
|
if err = f(node.Key()); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
// put inserts a new (key, value). Both key and value are valid inputs.
|
|
func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
|
|
size := b.curr.Size()
|
|
if size >= int64(b.config.MaxDatafileSize) {
|
|
err := b.curr.Close()
|
|
if err != nil {
|
|
return -1, 0, err
|
|
}
|
|
|
|
id := b.curr.FileID()
|
|
|
|
df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
|
|
if err != nil {
|
|
return -1, 0, err
|
|
}
|
|
|
|
b.datafiles[id] = df
|
|
|
|
id = b.curr.FileID() + 1
|
|
curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
|
|
if err != nil {
|
|
return -1, 0, err
|
|
}
|
|
b.curr = curr
|
|
}
|
|
|
|
e := internal.NewEntry(key, value)
|
|
return b.curr.Write(e)
|
|
}
|
|
|
|
func (b *Bitcask) Reopen() error {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
datafiles, lastID, err := loadDatafiles(b.path, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
curr, err := data.NewDatafile(b.path, lastID, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b.trie = t
|
|
b.curr = curr
|
|
b.datafiles = datafiles
|
|
|
|
return nil
|
|
}
|
|
|
|
// Merge merges all datafiles in the database. Old keys are squashed
|
|
// and deleted keys removes. Duplicate key/value pairs are also removed.
|
|
// Call this function periodically to reclaim disk space.
|
|
func (b *Bitcask) Merge() error {
|
|
// Temporary merged database path
|
|
temp, err := ioutil.TempDir(b.path, "merge")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer os.RemoveAll(temp)
|
|
|
|
// Create a merged database
|
|
mdb, err := Open(temp, b.options...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Rewrite all key/value pairs into merged database
|
|
// Doing this automatically strips deleted keys and
|
|
// old key/value pairs
|
|
err = b.Fold(func(key []byte) error {
|
|
value, err := b.Get(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := mdb.Put(key, value); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = mdb.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Close the database
|
|
err = b.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remove all data files
|
|
files, err := ioutil.ReadDir(b.path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, file := range files {
|
|
if !file.IsDir() {
|
|
err := os.RemoveAll(path.Join([]string{b.path, file.Name()}...))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Rename all merged data files
|
|
files, err = ioutil.ReadDir(mdb.path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, file := range files {
|
|
err := os.Rename(
|
|
path.Join([]string{mdb.path, file.Name()}...),
|
|
path.Join([]string{b.path, file.Name()}...),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// And finally reopen the database
|
|
return b.Reopen()
|
|
}
|
|
|
|
// Open opens the database at the given path with optional options.
|
|
// Options can be provided with the `WithXXX` functions that provide
|
|
// configuration options as functions.
|
|
func Open(path string, options ...Option) (*Bitcask, error) {
|
|
var (
|
|
cfg *config.Config
|
|
err error
|
|
)
|
|
|
|
configPath := filepath.Join(path, "config.json")
|
|
if internal.Exists(configPath) {
|
|
cfg, err = config.Load(configPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
cfg = newDefaultConfig()
|
|
}
|
|
|
|
for _, opt := range options {
|
|
if err := opt(cfg); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if err := os.MkdirAll(path, cfg.DirFileModeBeforeUmask); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bitcask := &Bitcask{
|
|
Flock: flock.New(filepath.Join(path, "lock")),
|
|
config: cfg,
|
|
options: options,
|
|
path: path,
|
|
indexer: index.NewIndexer(),
|
|
}
|
|
|
|
locked, err := bitcask.Flock.TryLock()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !locked {
|
|
return nil, ErrDatabaseLocked
|
|
}
|
|
|
|
if err := cfg.Save(configPath); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if cfg.AutoRecovery {
|
|
if err := data.CheckAndRecover(path, cfg); err != nil {
|
|
return nil, fmt.Errorf("recovering database: %s", err)
|
|
}
|
|
}
|
|
if err := bitcask.Reopen(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return bitcask, nil
|
|
}
|
|
|
|
func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileModeBeforeUmask os.FileMode) (datafiles map[int]data.Datafile, lastID int, err error) {
|
|
fns, err := internal.GetDatafiles(path)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
ids, err := internal.ParseIds(fns)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
datafiles = make(map[int]data.Datafile, len(ids))
|
|
for _, id := range ids {
|
|
datafiles[id], err = data.NewDatafile(path, id, true, maxKeySize, maxValueSize, fileModeBeforeUmask)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
}
|
|
if len(ids) > 0 {
|
|
lastID = ids[len(ids)-1]
|
|
}
|
|
return
|
|
}
|
|
|
|
func getSortedDatafiles(datafiles map[int]data.Datafile) []data.Datafile {
|
|
out := make([]data.Datafile, len(datafiles))
|
|
idx := 0
|
|
for _, df := range datafiles {
|
|
out[idx] = df
|
|
idx++
|
|
}
|
|
sort.Slice(out, func(i, j int) bool {
|
|
return out[i].FileID() < out[j].FileID()
|
|
})
|
|
return out
|
|
}
|
|
|
|
func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile) (art.Tree, error) {
|
|
t, found, err := indexer.Load(filepath.Join(path, "index"), maxKeySize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !found {
|
|
sortedDatafiles := getSortedDatafiles(datafiles)
|
|
for _, df := range sortedDatafiles {
|
|
var offset int64
|
|
for {
|
|
e, n, err := df.Read()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return nil, err
|
|
}
|
|
// Tombstone value (deleted key)
|
|
if len(e.Value) == 0 {
|
|
t.Delete(e.Key)
|
|
offset += n
|
|
continue
|
|
}
|
|
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
|
|
t.Insert(e.Key, item)
|
|
offset += n
|
|
}
|
|
}
|
|
}
|
|
return t, nil
|
|
}
|