Add support for keys with ttl (#177)

* ttl support first commit

* imports fix

* put api args correction

* put options added

* upgrade method added

* upgrade log added

* v0 to v1 migration script added

* error assertion added

* temp migration dir fix

Co-authored-by: yash <yash.chandra@grabpay.com>
This commit is contained in:
Yash Suresh Chandra
2020-12-21 13:11:43 +05:30
committed by GitHub
parent f397bec88f
commit 5c6ceadac1
13 changed files with 421 additions and 61 deletions

View File

@@ -11,6 +11,7 @@ import (
"path/filepath"
"sort"
"sync"
"time"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/flock"
@@ -20,6 +21,8 @@ import (
"github.com/prologic/bitcask/internal/data/codec"
"github.com/prologic/bitcask/internal/index"
"github.com/prologic/bitcask/internal/metadata"
"github.com/prologic/bitcask/scripts/migrations"
log "github.com/sirupsen/logrus"
)
const (
@@ -34,6 +37,10 @@ var (
// maximum allowed key size (configured with WithMaxKeySize).
ErrKeyTooLarge = errors.New("error: key too large")
// ErrKeyExpired is the error returned when a key is queried which has
// already expired (due to ttl)
ErrKeyExpired = errors.New("error: key expired")
// ErrEmptyKey is the error returned for a value with an empty key.
ErrEmptyKey = errors.New("error: empty key")
@@ -49,6 +56,8 @@ var (
// (typically opened by another process)
ErrDatabaseLocked = errors.New("error: database locked")
ErrInvalidVersion = errors.New("error: invalid db version")
// ErrMergeInProgress is the error returned if merge is called when already a merge
// is in progress
ErrMergeInProgress = errors.New("error: merge already in progress")
@@ -139,42 +148,14 @@ func (b *Bitcask) Sync() error {
return b.curr.Sync()
}
// Get fetches value for given key
// Get fetches value for a key
func (b *Bitcask) Get(key []byte) ([]byte, error) {
b.mu.RLock()
defer b.mu.RUnlock()
return b.get(key)
}
// get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
func (b *Bitcask) get(key []byte) ([]byte, error) {
var df data.Datafile
value, found := b.trie.Search(key)
if !found {
return nil, ErrKeyNotFound
}
item := value.(internal.Item)
if item.FileID == b.curr.FileID() {
df = b.curr
} else {
df = b.datafiles[item.FileID]
}
e, err := df.ReadAt(item.Offset, item.Size)
e, err := b.get(key)
if err != nil {
return nil, err
}
checksum := crc32.ChecksumIEEE(e.Value)
if checksum != e.Checksum {
return nil, ErrChecksumFailed
}
return e.Value, nil
}
@@ -187,7 +168,7 @@ func (b *Bitcask) Has(key []byte) bool {
}
// Put stores the key and value in the database.
func (b *Bitcask) Put(key, value []byte) error {
func (b *Bitcask) Put(key, value []byte, options ...PutOptions) error {
if len(key) == 0 {
return ErrEmptyKey
}
@@ -197,10 +178,16 @@ func (b *Bitcask) Put(key, value []byte) error {
if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize {
return ErrValueTooLarge
}
var feature Feature
for _, opt := range options {
if err := opt(&feature); err != nil {
return err
}
}
b.mu.Lock()
defer b.mu.Unlock()
offset, n, err := b.put(key, value)
offset, n, err := b.put(key, value, feature)
if err != nil {
return err
}
@@ -224,20 +211,24 @@ func (b *Bitcask) Put(key, value []byte) error {
return nil
}
// Delete deletes the named key. If the key doesn't exist or an I/O error
// occurs the error is returned.
// Delete deletes the named key.
func (b *Bitcask) Delete(key []byte) error {
b.mu.Lock()
_, _, err := b.put(key, []byte{})
defer b.mu.Unlock()
return b.delete(key)
}
// delete deletes the named key. If the key doesn't exist or an I/O error
// occurs the error is returned.
func (b *Bitcask) delete(key []byte) error {
_, _, err := b.put(key, []byte{}, Feature{})
if err != nil {
b.mu.Unlock()
return err
}
if item, found := b.trie.Search(key); found {
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(key))
}
b.trie.Delete(key)
b.mu.Unlock()
return nil
}
@@ -248,7 +239,7 @@ func (b *Bitcask) DeleteAll() (err error) {
defer b.mu.RUnlock()
b.trie.ForEach(func(node art.Node) bool {
_, _, err = b.put(node.Key(), []byte{})
_, _, err = b.put(node.Key(), []byte{}, Feature{})
if err != nil {
return false
}
@@ -320,8 +311,44 @@ func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
return
}
// get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
func (b *Bitcask) get(key []byte) (internal.Entry, error) {
var df data.Datafile
value, found := b.trie.Search(key)
if !found {
return internal.Entry{}, ErrKeyNotFound
}
item := value.(internal.Item)
if item.FileID == b.curr.FileID() {
df = b.curr
} else {
df = b.datafiles[item.FileID]
}
e, err := df.ReadAt(item.Offset, item.Size)
if err != nil {
return internal.Entry{}, err
}
if e.Expiry != nil && e.Expiry.Before(time.Now().UTC()) {
_ = b.delete(key) // we don't care if it doesnt succeed
return internal.Entry{}, ErrKeyExpired
}
checksum := crc32.ChecksumIEEE(e.Value)
if checksum != e.Checksum {
return internal.Entry{}, ErrChecksumFailed
}
return e, nil
}
// put inserts a new (key, value). Both key and value are valid inputs.
func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
func (b *Bitcask) put(key, value []byte, feature Feature) (int64, int64, error) {
size := b.curr.Size()
if size >= int64(b.config.MaxDatafileSize) {
err := b.curr.Close()
@@ -350,7 +377,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
}
}
e := internal.NewEntry(key, value)
e := internal.NewEntry(key, value, feature.Expiry)
return b.curr.Write(e)
}
@@ -466,12 +493,17 @@ func (b *Bitcask) Merge() error {
if item.(internal.Item).FileID > filesToMerge[len(filesToMerge)-1] {
return nil
}
value, err := b.get(key)
e, err := b.get(key)
if err != nil {
return err
}
// prepare entry options
var opts []PutOptions
if e.Expiry != nil {
opts = append(opts, WithExpiry(*(e.Expiry)))
}
if err := mdb.Put(key, value); err != nil {
if err := mdb.Put(key, e.Value, opts...); err != nil {
return err
}
@@ -553,6 +585,10 @@ func Open(path string, options ...Option) (*Bitcask, error) {
cfg = newDefaultConfig()
}
if err := checkAndUpgrade(cfg, configPath); err != nil {
return nil, err
}
for _, opt := range options {
if err := opt(cfg); err != nil {
return nil, err
@@ -602,6 +638,24 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return bitcask, nil
}
// checkAndUpgrade checks if DB upgrade is required
// if yes, then applies version upgrade and saves updated config
func checkAndUpgrade(cfg *config.Config, configPath string) error {
if cfg.DBVersion == CurrentDBVersion {
return nil
}
if cfg.DBVersion > CurrentDBVersion {
return ErrInvalidVersion
}
// for v0 to v1 upgrade, we need to append 8 null bytes after each encoded entry in datafiles
if cfg.DBVersion == uint32(0) && CurrentDBVersion == uint32(1) {
log.Warn("upgrading db version, might take some time....")
cfg.DBVersion = CurrentDBVersion
return migrations.ApplyV0ToV1(filepath.Dir(configPath), cfg.MaxDatafileSize)
}
return nil
}
// Backup copies db directory to given path
// it creates path if it does not exist
func (b *Bitcask) Backup(path string) error {