mirror of
https://github.com/taigrr/bitcask
synced 2025-01-18 04:03:17 -08:00
1108 lines
26 KiB
Go
1108 lines
26 KiB
Go
package bitcask
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"hash/crc32"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/abcum/lcp"
|
|
"github.com/gofrs/flock"
|
|
art "github.com/plar/go-adaptive-radix-tree"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"git.mills.io/prologic/bitcask/v2/internal"
|
|
"git.mills.io/prologic/bitcask/v2/internal/config"
|
|
"git.mills.io/prologic/bitcask/v2/internal/data"
|
|
"git.mills.io/prologic/bitcask/v2/internal/data/codec"
|
|
"git.mills.io/prologic/bitcask/v2/internal/index"
|
|
"git.mills.io/prologic/bitcask/v2/internal/metadata"
|
|
"git.mills.io/prologic/bitcask/v2/scripts/migrations"
|
|
)
|
|
|
|
const (
|
|
lockfile = "lock"
|
|
ttlIndexFile = "ttl_index"
|
|
)
|
|
|
|
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
|
|
// and in-memory hash of key/value pairs as per the Bitcask paper and seen
|
|
// in the Riak database.
|
|
type Bitcask struct {
|
|
mu sync.RWMutex
|
|
flock *flock.Flock
|
|
config *config.Config
|
|
options []Option
|
|
path string
|
|
curr data.Datafile
|
|
datafiles map[int]data.Datafile
|
|
trie art.Tree
|
|
indexer index.Indexer
|
|
ttlIndexer index.Indexer
|
|
ttlIndex art.Tree
|
|
metadata *metadata.MetaData
|
|
isMerging bool
|
|
}
|
|
|
|
// Stats is a struct returned by Stats() on an open Bitcask instance
|
|
type Stats struct {
|
|
Datafiles int
|
|
Keys int
|
|
Size int64
|
|
}
|
|
|
|
// Stats returns statistics about the database including the number of
|
|
// data files, keys and overall size on disk of the data
|
|
func (b *Bitcask) Stats(ctx context.Context) (stats Stats, err error) {
|
|
if stats.Size, err = internal.DirSize(b.path); err != nil {
|
|
return
|
|
}
|
|
|
|
b.mu.RLock()
|
|
stats.Datafiles = len(b.datafiles)
|
|
stats.Keys = b.trie.Size()
|
|
b.mu.RUnlock()
|
|
|
|
return
|
|
}
|
|
|
|
// Close closes the database and removes the lock. It is important to call
|
|
// Close() as this is the only way to cleanup the lock held by the open
|
|
// database.
|
|
func (b *Bitcask) Close() error {
|
|
b.mu.RLock()
|
|
defer func() {
|
|
b.mu.RUnlock()
|
|
b.flock.Unlock()
|
|
}()
|
|
|
|
return b.close()
|
|
}
|
|
|
|
func (b *Bitcask) close() error {
|
|
if err := b.saveIndexes(); err != nil {
|
|
return err
|
|
}
|
|
|
|
b.metadata.IndexUpToDate = true
|
|
if err := b.saveMetadata(); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, df := range b.datafiles {
|
|
if err := df.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return b.curr.Close()
|
|
}
|
|
|
|
// Sync flushes all buffers to disk ensuring all data is written
|
|
func (b *Bitcask) Sync() error {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
if err := b.saveMetadata(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return b.curr.Sync()
|
|
}
|
|
|
|
// Get fetches value for a key
|
|
func (b *Bitcask) Get(ctx context.Context, key []byte) ([]byte, error) {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
e, err := b.get(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return e.Value, nil
|
|
}
|
|
|
|
// Has returns true if the key exists in the database, false otherwise.
|
|
func (b *Bitcask) Has(ctx context.Context, key []byte) bool {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
_, found := b.trie.Search(key)
|
|
if found {
|
|
return !b.isExpired(key)
|
|
}
|
|
return found
|
|
}
|
|
|
|
// Put stores the key and value in the database.
|
|
func (b *Bitcask) Put(ctx context.Context, key, value []byte) error {
|
|
if len(key) == 0 {
|
|
return ErrEmptyKey
|
|
}
|
|
if b.config.MaxKeySize > 0 && uint32(len(key)) > b.config.MaxKeySize {
|
|
return ErrKeyTooLarge
|
|
}
|
|
if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize {
|
|
return ErrValueTooLarge
|
|
}
|
|
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
offset, n, err := b.put(key, value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if b.config.Sync {
|
|
if err := b.curr.Sync(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// in case of successful `put`, IndexUpToDate will be always be false
|
|
b.metadata.IndexUpToDate = false
|
|
|
|
if oldItem, found := b.trie.Search(key); found {
|
|
b.metadata.ReclaimableSpace += oldItem.(internal.Item).Size
|
|
}
|
|
|
|
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
|
|
b.trie.Insert(key, item)
|
|
|
|
return nil
|
|
}
|
|
|
|
// PutWithTTL stores the key and value in the database with the given TTL
|
|
func (b *Bitcask) PutWithTTL(ctx context.Context, key, value []byte, ttl time.Duration) error {
|
|
if len(key) == 0 {
|
|
return ErrEmptyKey
|
|
}
|
|
if b.config.MaxKeySize > 0 && uint32(len(key)) > b.config.MaxKeySize {
|
|
return ErrKeyTooLarge
|
|
}
|
|
if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize {
|
|
return ErrValueTooLarge
|
|
}
|
|
|
|
expiry := time.Now().Add(ttl)
|
|
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
offset, n, err := b.putWithExpiry(key, value, expiry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if b.config.Sync {
|
|
if err := b.curr.Sync(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// in case of successful `put`, IndexUpToDate will be always be false
|
|
b.metadata.IndexUpToDate = false
|
|
|
|
if oldItem, found := b.trie.Search(key); found {
|
|
b.metadata.ReclaimableSpace += oldItem.(internal.Item).Size
|
|
}
|
|
|
|
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
|
|
b.trie.Insert(key, item)
|
|
b.ttlIndex.Insert(key, expiry)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Delete deletes the named key.
|
|
func (b *Bitcask) Delete(key []byte) error {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.delete(key)
|
|
}
|
|
|
|
// delete deletes the named key. If the key doesn't exist or an I/O error
|
|
// occurs the error is returned.
|
|
func (b *Bitcask) delete(key []byte) error {
|
|
_, _, err := b.put(key, []byte{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if item, found := b.trie.Search(key); found {
|
|
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(key))
|
|
}
|
|
b.trie.Delete(key)
|
|
b.ttlIndex.Delete(key)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Sift iterates over all keys in the database calling the function `f` for
|
|
// each key. If the KV pair is expired or the function returns true, that key is
|
|
// deleted from the database.
|
|
// If the function returns an error on any key, no further keys are processed, no
|
|
// keys are deleted, and the first error is returned.
|
|
func (b *Bitcask) Sift(ctx context.Context, f func(ctx context.Context, key []byte) (bool, error)) (err error) {
|
|
keysToDelete := art.New()
|
|
|
|
b.mu.RLock()
|
|
b.trie.ForEach(func(node art.Node) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ErrContextDeadlineExceeded
|
|
return false
|
|
default:
|
|
if b.isExpired(node.Key()) {
|
|
keysToDelete.Insert(node.Key(), true)
|
|
return true
|
|
}
|
|
var shouldDelete bool
|
|
if shouldDelete, err = f(ctx, node.Key()); err != nil {
|
|
return false
|
|
} else if shouldDelete {
|
|
keysToDelete.Insert(node.Key(), true)
|
|
}
|
|
return true
|
|
}
|
|
})
|
|
b.mu.RUnlock()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
keysToDelete.ForEach(func(node art.Node) (cont bool) {
|
|
b.delete(node.Key())
|
|
return true
|
|
})
|
|
return
|
|
}
|
|
|
|
// DeleteAll deletes all the keys. If an I/O error occurs the error is returned.
|
|
func (b *Bitcask) DeleteAll() (err error) {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
b.trie.ForEach(func(node art.Node) bool {
|
|
_, _, err = b.put(node.Key(), []byte{})
|
|
if err != nil {
|
|
return false
|
|
}
|
|
item, _ := b.trie.Search(node.Key())
|
|
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(node.Key()))
|
|
return true
|
|
})
|
|
b.trie = art.New()
|
|
b.ttlIndex = art.New()
|
|
|
|
return
|
|
}
|
|
|
|
// Scan performs a prefix scan of keys matching the given prefix and calling
|
|
// the function `f` with the keys found. If the function returns an error
|
|
// no further keys are processed and the first error is returned.
|
|
func (b *Bitcask) Scan(ctx context.Context, prefix []byte, f func(ctx context.Context, key []byte) error) (err error) {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ErrContextDeadlineExceeded
|
|
return false
|
|
default:
|
|
// Skip the root node
|
|
if len(node.Key()) == 0 {
|
|
return true
|
|
}
|
|
|
|
if err = f(ctx, node.Key()); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
})
|
|
return
|
|
}
|
|
|
|
// SiftScan iterates over all keys in the database beginning with the given
|
|
// prefix, calling the function `f` for each key. If the KV pair is expired or
|
|
// the function returns true, that key is deleted from the database.
|
|
// If the function returns an error on any key, no further keys are processed,
|
|
// no keys are deleted, and the first error is returned.
|
|
func (b *Bitcask) SiftScan(ctx context.Context, prefix []byte, f func(key []byte) (bool, error)) (err error) {
|
|
keysToDelete := art.New()
|
|
|
|
b.mu.RLock()
|
|
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ErrContextDeadlineExceeded
|
|
return false
|
|
default:
|
|
// Skip the root node
|
|
if len(node.Key()) == 0 {
|
|
return true
|
|
}
|
|
if b.isExpired(node.Key()) {
|
|
keysToDelete.Insert(node.Key(), true)
|
|
return true
|
|
}
|
|
var shouldDelete bool
|
|
if shouldDelete, err = f(node.Key()); err != nil {
|
|
return false
|
|
} else if shouldDelete {
|
|
keysToDelete.Insert(node.Key(), true)
|
|
}
|
|
return true
|
|
}
|
|
})
|
|
b.mu.RUnlock()
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
keysToDelete.ForEach(func(node art.Node) (cont bool) {
|
|
b.delete(node.Key())
|
|
return true
|
|
})
|
|
return
|
|
}
|
|
|
|
// Range performs a range scan of keys matching a range of keys between the
|
|
// start key and end key and calling the function `f` with the keys found.
|
|
// If the function returns an error no further keys are processed and the
|
|
// first error returned.
|
|
func (b *Bitcask) Range(ctx context.Context, start, end []byte, f func(ctx context.Context, key []byte) error) (err error) {
|
|
if bytes.Compare(start, end) == 1 {
|
|
return ErrInvalidRange
|
|
}
|
|
|
|
commonPrefix := lcp.LCP(start, end)
|
|
if commonPrefix == nil {
|
|
return ErrInvalidRange
|
|
}
|
|
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
b.trie.ForEachPrefix(commonPrefix, func(node art.Node) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ErrContextDeadlineExceeded
|
|
return false
|
|
default:
|
|
if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) <= 0 {
|
|
if err = f(ctx, node.Key()); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
} else if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) > 0 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
})
|
|
return
|
|
}
|
|
|
|
// SiftRange performs a range scan of keys matching a range of keys between the
|
|
// start key and end key and calling the function `f` with the keys found.
|
|
// If the KV pair is expired or the function returns true, that key is deleted
|
|
// from the database.
|
|
// If the function returns an error on any key, no further keys are processed, no
|
|
// keys are deleted, and the first error is returned.
|
|
func (b *Bitcask) SiftRange(ctx context.Context, start, end []byte, f func(ctx context.Context, key []byte) (bool, error)) (err error) {
|
|
if bytes.Compare(start, end) == 1 {
|
|
return ErrInvalidRange
|
|
}
|
|
|
|
commonPrefix := lcp.LCP(start, end)
|
|
if commonPrefix == nil {
|
|
return ErrInvalidRange
|
|
}
|
|
|
|
keysToDelete := art.New()
|
|
|
|
b.mu.RLock()
|
|
b.trie.ForEachPrefix(commonPrefix, func(node art.Node) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ErrContextDeadlineExceeded
|
|
return false
|
|
default:
|
|
if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) <= 0 {
|
|
if b.isExpired(node.Key()) {
|
|
keysToDelete.Insert(node.Key(), true)
|
|
return true
|
|
}
|
|
var shouldDelete bool
|
|
if shouldDelete, err = f(ctx, node.Key()); err != nil {
|
|
return false
|
|
} else if shouldDelete {
|
|
keysToDelete.Insert(node.Key(), true)
|
|
}
|
|
return true
|
|
} else if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) > 0 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
})
|
|
b.mu.RUnlock()
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
keysToDelete.ForEach(func(node art.Node) (cont bool) {
|
|
b.delete(node.Key())
|
|
return true
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
// Len returns the total number of keys in the database
|
|
func (b *Bitcask) Len(ctx context.Context) int {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
return b.trie.Size()
|
|
}
|
|
|
|
// Keys returns all keys in the database as a channel of keys
|
|
func (b *Bitcask) Keys() chan []byte {
|
|
ch := make(chan []byte)
|
|
go func() {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
for it := b.trie.Iterator(); it.HasNext(); {
|
|
node, _ := it.Next()
|
|
if b.isExpired(node.Key()) {
|
|
continue
|
|
}
|
|
ch <- node.Key()
|
|
}
|
|
close(ch)
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
// RunGC deletes all expired keys
|
|
func (b *Bitcask) RunGC() error {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.runGC()
|
|
}
|
|
|
|
// runGC deletes all keys that are expired
|
|
// caller function should take care of the locking when calling this method
|
|
func (b *Bitcask) runGC() (err error) {
|
|
keysToDelete := art.New()
|
|
|
|
b.ttlIndex.ForEach(func(node art.Node) (cont bool) {
|
|
if !b.isExpired(node.Key()) {
|
|
// later, return false here when the ttlIndex is sorted
|
|
return true
|
|
}
|
|
keysToDelete.Insert(node.Key(), true)
|
|
//keysToDelete = append(keysToDelete, node.Key())
|
|
return true
|
|
})
|
|
|
|
keysToDelete.ForEach(func(node art.Node) (cont bool) {
|
|
b.delete(node.Key())
|
|
return true
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// Fold iterates over all keys in the database calling the function `f` for
|
|
// each key. If the function returns an error, no further keys are processed
|
|
// and the error is returned.
|
|
func (b *Bitcask) Fold(ctx context.Context, f func(ctx context.Context, key []byte) error) (err error) {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
b.trie.ForEach(func(node art.Node) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ErrContextDeadlineExceeded
|
|
return false
|
|
default:
|
|
if err = f(ctx, node.Key()); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
// get retrieves the value of the given key
|
|
func (b *Bitcask) get(key []byte) (internal.Entry, error) {
|
|
var df data.Datafile
|
|
|
|
value, found := b.trie.Search(key)
|
|
if !found {
|
|
return internal.Entry{}, ErrKeyNotFound
|
|
}
|
|
if b.isExpired(key) {
|
|
return internal.Entry{}, ErrKeyExpired
|
|
}
|
|
|
|
item := value.(internal.Item)
|
|
|
|
if item.FileID == b.curr.FileID() {
|
|
df = b.curr
|
|
} else {
|
|
df = b.datafiles[item.FileID]
|
|
}
|
|
|
|
e, err := df.ReadAt(item.Offset, item.Size)
|
|
if err != nil {
|
|
return internal.Entry{}, err
|
|
}
|
|
|
|
checksum := crc32.ChecksumIEEE(e.Value)
|
|
if checksum != e.Checksum {
|
|
return internal.Entry{}, ErrChecksumFailed
|
|
}
|
|
|
|
return e, nil
|
|
}
|
|
|
|
func (b *Bitcask) maybeRotate() error {
|
|
size := b.curr.Size()
|
|
if size < int64(b.config.MaxDatafileSize) {
|
|
return nil
|
|
}
|
|
|
|
err := b.curr.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
id := b.curr.FileID()
|
|
|
|
df, err := data.NewDatafile(
|
|
b.path, id, true,
|
|
b.config.MaxKeySize,
|
|
b.config.MaxValueSize,
|
|
b.config.FileFileModeBeforeUmask,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b.datafiles[id] = df
|
|
|
|
id = b.curr.FileID() + 1
|
|
curr, err := data.NewDatafile(
|
|
b.path, id, false,
|
|
b.config.MaxKeySize,
|
|
b.config.MaxValueSize,
|
|
b.config.FileFileModeBeforeUmask,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.curr = curr
|
|
err = b.saveIndexes()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// put inserts a new (key, value). Both key and value are valid inputs.
|
|
func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
|
|
if err := b.maybeRotate(); err != nil {
|
|
return -1, 0, fmt.Errorf("error rotating active datafile: %w", err)
|
|
}
|
|
|
|
return b.curr.Write(internal.NewEntry(key, value, nil))
|
|
}
|
|
|
|
// putWithExpiry inserts a new (key, value, expiry).
|
|
// Both key and value are valid inputs.
|
|
func (b *Bitcask) putWithExpiry(key, value []byte, expiry time.Time) (int64, int64, error) {
|
|
if err := b.maybeRotate(); err != nil {
|
|
return -1, 0, fmt.Errorf("error rotating active datafile: %w", err)
|
|
}
|
|
|
|
return b.curr.Write(internal.NewEntry(key, value, &expiry))
|
|
}
|
|
|
|
// closeCurrentFile closes current datafile and makes it read only.
|
|
func (b *Bitcask) closeCurrentFile() error {
|
|
if err := b.curr.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
id := b.curr.FileID()
|
|
df, err := data.NewDatafile(
|
|
b.path, id, true,
|
|
b.config.MaxKeySize,
|
|
b.config.MaxValueSize,
|
|
b.config.FileFileModeBeforeUmask,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b.datafiles[id] = df
|
|
return nil
|
|
}
|
|
|
|
// openNewWritableFile opens new datafile for writing data
|
|
func (b *Bitcask) openNewWritableFile() error {
|
|
id := b.curr.FileID() + 1
|
|
curr, err := data.NewDatafile(
|
|
b.path, id, false,
|
|
b.config.MaxKeySize,
|
|
b.config.MaxValueSize,
|
|
b.config.FileFileModeBeforeUmask,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.curr = curr
|
|
return nil
|
|
}
|
|
|
|
// Reopen closes and reopsns the database
|
|
func (b *Bitcask) Reopen() error {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
return b.reopen()
|
|
}
|
|
|
|
// reopen reloads a bitcask object with index and datafiles
|
|
// caller of this method should take care of locking
|
|
func (b *Bitcask) reopen() error {
|
|
datafiles, lastID, err := loadDatafiles(
|
|
b.path,
|
|
b.config.MaxKeySize,
|
|
b.config.MaxValueSize,
|
|
b.config.FileFileModeBeforeUmask,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t, ttlIndex, err := loadIndexes(b, datafiles, lastID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
curr, err := data.NewDatafile(
|
|
b.path, lastID, false,
|
|
b.config.MaxKeySize,
|
|
b.config.MaxValueSize,
|
|
b.config.FileFileModeBeforeUmask,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b.trie = t
|
|
b.curr = curr
|
|
b.ttlIndex = ttlIndex
|
|
b.datafiles = datafiles
|
|
|
|
return nil
|
|
}
|
|
|
|
// Merge merges all datafiles in the database. Old keys are squashed
|
|
// and deleted keys removes. Duplicate key/value pairs are also removed.
|
|
// Call this function periodically to reclaim disk space.
|
|
func (b *Bitcask) Merge() error {
|
|
b.mu.Lock()
|
|
if b.isMerging {
|
|
b.mu.Unlock()
|
|
return ErrMergeInProgress
|
|
}
|
|
b.isMerging = true
|
|
b.mu.Unlock()
|
|
defer func() {
|
|
b.isMerging = false
|
|
}()
|
|
b.mu.RLock()
|
|
err := b.closeCurrentFile()
|
|
if err != nil {
|
|
b.mu.RUnlock()
|
|
return err
|
|
}
|
|
filesToMerge := make([]int, 0, len(b.datafiles))
|
|
for k := range b.datafiles {
|
|
filesToMerge = append(filesToMerge, k)
|
|
}
|
|
err = b.openNewWritableFile()
|
|
if err != nil {
|
|
b.mu.RUnlock()
|
|
return err
|
|
}
|
|
b.mu.RUnlock()
|
|
sort.Ints(filesToMerge)
|
|
|
|
// Temporary merged database path
|
|
temp, err := ioutil.TempDir(b.path, "merge")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer os.RemoveAll(temp)
|
|
|
|
// Create a merged database
|
|
mdb, err := Open(temp, withConfig(b.config))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Rewrite all key/value pairs into merged database
|
|
// Doing this automatically strips deleted keys and
|
|
// old key/value pairs
|
|
err = b.Fold(context.Background(), func(ctx context.Context, key []byte) error {
|
|
item, _ := b.trie.Search(key)
|
|
// if key was updated after start of merge operation, nothing to do
|
|
if item.(internal.Item).FileID > filesToMerge[len(filesToMerge)-1] {
|
|
return nil
|
|
}
|
|
e, err := b.get(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if e.Expiry != nil {
|
|
if err := mdb.PutWithTTL(ctx, key, e.Value, time.Until(*e.Expiry)); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := mdb.Put(ctx, key, e.Value); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = mdb.Close(); err != nil {
|
|
return err
|
|
}
|
|
// no reads and writes till we reopen
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if err = b.close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remove data files
|
|
files, err := ioutil.ReadDir(b.path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, file := range files {
|
|
if file.IsDir() || file.Name() == lockfile {
|
|
continue
|
|
}
|
|
ids, err := internal.ParseIds([]string{file.Name()})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// if datafile was created after start of merge, skip
|
|
if len(ids) > 0 && ids[0] > filesToMerge[len(filesToMerge)-1] {
|
|
continue
|
|
}
|
|
err = os.RemoveAll(path.Join(b.path, file.Name()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Rename all merged data files
|
|
files, err = ioutil.ReadDir(mdb.path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, file := range files {
|
|
// see #225
|
|
if file.Name() == lockfile {
|
|
continue
|
|
}
|
|
err := os.Rename(
|
|
path.Join([]string{mdb.path, file.Name()}...),
|
|
path.Join([]string{b.path, file.Name()}...),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
b.metadata.ReclaimableSpace = 0
|
|
|
|
// And finally reopen the database
|
|
return b.reopen()
|
|
}
|
|
|
|
// Open opens the database at the given path with optional options.
|
|
// Options can be provided with the `WithXXX` functions that provide
|
|
// configuration options as functions.
|
|
func Open(path string, options ...Option) (*Bitcask, error) {
|
|
var (
|
|
cfg *config.Config
|
|
err error
|
|
meta *metadata.MetaData
|
|
)
|
|
|
|
configPath := filepath.Join(path, "config.json")
|
|
if internal.Exists(configPath) {
|
|
cfg, err = config.Load(configPath)
|
|
if err != nil {
|
|
return nil, &ErrBadConfig{err}
|
|
}
|
|
} else {
|
|
cfg = newDefaultConfig()
|
|
}
|
|
|
|
if err := checkAndUpgrade(cfg, configPath); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, opt := range options {
|
|
if err := opt(cfg); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if err := os.MkdirAll(path, cfg.DirFileModeBeforeUmask); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
meta, err = loadMetadata(path)
|
|
if err != nil {
|
|
return nil, &ErrBadMetadata{err}
|
|
}
|
|
|
|
bitcask := &Bitcask{
|
|
flock: flock.New(filepath.Join(path, lockfile)),
|
|
config: cfg,
|
|
options: options,
|
|
path: path,
|
|
indexer: index.NewIndexer(),
|
|
ttlIndexer: index.NewTTLIndexer(),
|
|
metadata: meta,
|
|
}
|
|
|
|
ok, err := bitcask.flock.TryLock()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !ok {
|
|
return nil, ErrDatabaseLocked
|
|
}
|
|
|
|
if err := cfg.Save(configPath); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if cfg.AutoRecovery {
|
|
if err := data.CheckAndRecover(path, cfg); err != nil {
|
|
return nil, fmt.Errorf("recovering database: %s", err)
|
|
}
|
|
}
|
|
if err := bitcask.Reopen(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return bitcask, nil
|
|
}
|
|
|
|
// checkAndUpgrade checks if DB upgrade is required
|
|
// if yes, then applies version upgrade and saves updated config
|
|
func checkAndUpgrade(cfg *config.Config, configPath string) error {
|
|
if cfg.DBVersion == CurrentDBVersion {
|
|
return nil
|
|
}
|
|
if cfg.DBVersion > CurrentDBVersion {
|
|
return ErrInvalidVersion
|
|
}
|
|
// for v0 to v1 upgrade, we need to append 8 null bytes after each encoded entry in datafiles
|
|
if cfg.DBVersion == uint32(0) && CurrentDBVersion == uint32(1) {
|
|
log.Warn("upgrading db version, might take some time....")
|
|
cfg.DBVersion = CurrentDBVersion
|
|
return migrations.ApplyV0ToV1(filepath.Dir(configPath), cfg.MaxDatafileSize)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Backup copies db directory to given path
|
|
// it creates path if it does not exist
|
|
func (b *Bitcask) Backup(path string) error {
|
|
if !internal.Exists(path) {
|
|
if err := os.MkdirAll(path, b.config.DirFileModeBeforeUmask); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return internal.Copy(b.path, path, []string{lockfile})
|
|
}
|
|
|
|
// saveIndex saves index and ttl_index currently in RAM to disk
|
|
func (b *Bitcask) saveIndexes() error {
|
|
tempIdx := "temp_index"
|
|
if err := b.indexer.Save(b.trie, filepath.Join(b.path, tempIdx)); err != nil {
|
|
return err
|
|
}
|
|
if err := os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index")); err != nil {
|
|
return err
|
|
}
|
|
if err := b.ttlIndexer.Save(b.ttlIndex, filepath.Join(b.path, tempIdx)); err != nil {
|
|
return err
|
|
}
|
|
return os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, ttlIndexFile))
|
|
}
|
|
|
|
// saveMetadata saves metadata into disk
|
|
func (b *Bitcask) saveMetadata() error {
|
|
return b.metadata.Save(filepath.Join(b.path, "meta.json"), b.config.DirFileModeBeforeUmask)
|
|
}
|
|
|
|
// Reclaimable returns space that can be reclaimed
|
|
func (b *Bitcask) Reclaimable() int64 {
|
|
return b.metadata.ReclaimableSpace
|
|
}
|
|
|
|
// isExpired returns true if a key has expired
|
|
// it returns false if key does not exist in ttl index
|
|
func (b *Bitcask) isExpired(key []byte) bool {
|
|
expiry, found := b.ttlIndex.Search(key)
|
|
if !found {
|
|
return false
|
|
}
|
|
return expiry.(time.Time).Before(time.Now().UTC())
|
|
}
|
|
|
|
func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileModeBeforeUmask os.FileMode) (datafiles map[int]data.Datafile, lastID int, err error) {
|
|
fns, err := internal.GetDatafiles(path)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
ids, err := internal.ParseIds(fns)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
datafiles = make(map[int]data.Datafile, len(ids))
|
|
for _, id := range ids {
|
|
datafiles[id], err = data.NewDatafile(
|
|
path, id, true,
|
|
maxKeySize,
|
|
maxValueSize,
|
|
fileModeBeforeUmask,
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
}
|
|
if len(ids) > 0 {
|
|
lastID = ids[len(ids)-1]
|
|
}
|
|
return
|
|
}
|
|
|
|
func getSortedDatafiles(datafiles map[int]data.Datafile) []data.Datafile {
|
|
out := make([]data.Datafile, len(datafiles))
|
|
idx := 0
|
|
for _, df := range datafiles {
|
|
out[idx] = df
|
|
idx++
|
|
}
|
|
sort.Slice(out, func(i, j int) bool {
|
|
return out[i].FileID() < out[j].FileID()
|
|
})
|
|
return out
|
|
}
|
|
|
|
// loadIndexes loads index from disk to memory. If index is not available or partially available (last bitcask process crashed)
|
|
// then it iterates over last datafile and construct index
|
|
// we construct ttl_index here also along with normal index
|
|
func loadIndexes(b *Bitcask, datafiles map[int]data.Datafile, lastID int) (art.Tree, art.Tree, error) {
|
|
t, found, err := b.indexer.Load(filepath.Join(b.path, "index"), b.config.MaxKeySize)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
ttlIndex, _, err := b.ttlIndexer.Load(filepath.Join(b.path, ttlIndexFile), b.config.MaxKeySize)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if found && b.metadata.IndexUpToDate {
|
|
return t, ttlIndex, nil
|
|
}
|
|
if found {
|
|
if err := loadIndexFromDatafile(t, ttlIndex, datafiles[lastID]); err != nil {
|
|
return nil, ttlIndex, err
|
|
}
|
|
return t, ttlIndex, nil
|
|
}
|
|
sortedDatafiles := getSortedDatafiles(datafiles)
|
|
for _, df := range sortedDatafiles {
|
|
if err := loadIndexFromDatafile(t, ttlIndex, df); err != nil {
|
|
return nil, ttlIndex, err
|
|
}
|
|
}
|
|
return t, ttlIndex, nil
|
|
}
|
|
|
|
func loadIndexFromDatafile(t art.Tree, ttlIndex art.Tree, df data.Datafile) error {
|
|
var offset int64
|
|
for {
|
|
e, n, err := df.Read()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
// Tombstone value (deleted key)
|
|
if len(e.Value) == 0 {
|
|
t.Delete(e.Key)
|
|
offset += n
|
|
continue
|
|
}
|
|
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
|
|
t.Insert(e.Key, item)
|
|
if e.Expiry != nil {
|
|
ttlIndex.Insert(e.Key, *e.Expiry)
|
|
}
|
|
offset += n
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func loadMetadata(path string) (*metadata.MetaData, error) {
|
|
if !internal.Exists(filepath.Join(path, "meta.json")) {
|
|
meta := new(metadata.MetaData)
|
|
return meta, nil
|
|
}
|
|
return metadata.Load(filepath.Join(path, "meta.json"))
|
|
}
|