Compare commits

...

16 Commits

Author SHA1 Message Date
6e423ae179 Update CHANGELOG for v0.3.10 2020-12-18 23:18:50 +10:00
8a60b5a370 Fix a bug when MaxValueSize == 0 on Merge operations 2020-12-18 07:35:16 +10:00
Haleem Assal
29e1cf648b Save metadata on Sync (#197)
* Save metadata on Sync

* Add test
2020-12-15 06:32:48 +10:00
Yash Suresh Chandra
3a6235ea03 exclusive lock before closing db in merge (#196)
Co-authored-by: yash <yash.chandra@grabpay.com>
2020-12-13 21:28:54 +10:00
4e7414e920 Fix link to bitcask-bench 2020-12-12 07:55:29 +10:00
a6ed0bc12f Update README.md 2020-12-12 07:54:07 +10:00
0ab7d79246 Add support for unlimited key/value sizes 2020-12-12 02:16:36 +10:00
Georges Varouchas
38156e8461 Gv/issue 165 unlock race condition (#175)
* add failing test case to highlight the race condition on bug

note : the test "TestLock" is non deterministic, its outcome depends
on the sequence of instructions yielded by the go scheduler on each run.

There are two values, "goroutines" and "succesfulLockCount", which can
be edited to see how the test performs.
With the committed value, resp "20" and "50", I had a 100% failure on
my local machine, running linux (Ubuntu 20.04).

Sample test output :

$ go test . -run TestLock
--- FAIL: TestLock (0.17s)
    lock_test.go:91: [runner 14] lockCounter was > 1 on  5 occasions, max seen value was  2
    lock_test.go:91: [runner 03] lockCounter was > 1 on  2 occasions, max seen value was  3
    lock_test.go:91: [runner 02] lockCounter was > 1 on  3 occasions, max seen value was  3
    lock_test.go:91: [runner 00] lockCounter was > 1 on  1 occasions, max seen value was  2
    lock_test.go:91: [runner 12] lockCounter was > 1 on  7 occasions, max seen value was  3
    lock_test.go:91: [runner 01] lockCounter was > 1 on  8 occasions, max seen value was  2
    lock_test.go:91: [runner 04] lockCounter was > 1 on  6 occasions, max seen value was  4
    lock_test.go:91: [runner 13] lockCounter was > 1 on  1 occasions, max seen value was  2
    lock_test.go:91: [runner 17] lockCounter was > 1 on  4 occasions, max seen value was  2
    lock_test.go:91: [runner 10] lockCounter was > 1 on  3 occasions, max seen value was  2
    lock_test.go:91: [runner 08] lockCounter was > 1 on  6 occasions, max seen value was  2
    lock_test.go:91: [runner 09] lockCounter was > 1 on  4 occasions, max seen value was  2
    lock_test.go:91: [runner 05] lockCounter was > 1 on  1 occasions, max seen value was  2
    lock_test.go:91: [runner 19] lockCounter was > 1 on  3 occasions, max seen value was  3
    lock_test.go:91: [runner 07] lockCounter was > 1 on  4 occasions, max seen value was  3
    lock_test.go:91: [runner 11] lockCounter was > 1 on  9 occasions, max seen value was  2
    lock_test.go:91: [runner 15] lockCounter was > 1 on  1 occasions, max seen value was  3
    lock_test.go:91: [runner 16] lockCounter was > 1 on  1 occasions, max seen value was  3
FAIL
FAIL	github.com/prologic/bitcask	0.176s
FAIL

* flock: create a wrapper module, local to bitcask, around gofrs.Flock

the racy TestLock has been moved to bitcask/flock

* flock: add test for expected regular locking behavior

* flock: replace gofrs/flock with local implementation

* update go.sum

* Add build constraint for flock_unix.go

Co-authored-by: James Mills <prologic@shortcircuit.net.au>
2020-12-11 20:56:58 +10:00
Yash Suresh Chandra
e1cdffd8f1 new merge approach (#191)
* new merge approach

* code refactor

* comment added

* isMerging flag added to allow 1 merge operation at a time

* get api modified. merge updated (no recursive read locks)

Co-authored-by: yash <yash.chandra@grabpay.com>
Co-authored-by: James Mills <prologic@shortcircuit.net.au>
2020-12-11 20:48:41 +10:00
80c06a3572 Remove some workflows that won't work on Forks anyway 2020-12-11 20:45:26 +10:00
9172eb0f90 Fix CI (again) 2020-12-11 20:43:56 +10:00
c09ce153e9 Fix CI 2020-12-11 20:41:44 +10:00
d3428bac8c Drop support for Windows (Closes #192) 2020-12-11 20:33:53 +10:00
yashschandra
158f6d9888 Get space that can be reclaimed (#189)
* get reclaimable space added

* import order fix

Co-authored-by: yash <yash.chandra@grabpay.com>
2020-12-01 06:07:00 +10:00
yashschandra
f4357e6f18 local live backup support (#185)
* live backup first commit

* exclude lock file in backup

* create path if not exist for backup

Co-authored-by: yash <yash.chandra@grabpay.com>
Co-authored-by: James Mills <prologic@shortcircuit.net.au>
2020-11-30 07:49:02 +10:00
5e01d6d098 Add a few more test cases for concurrent operations 2020-11-27 16:52:08 +10:00
23 changed files with 1168 additions and 188 deletions

View File

@@ -1,11 +0,0 @@
name: Auto approve
on: [pull_request]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: hmarr/auto-approve-action@v2.0.0
if: github.actor == 'dependabot[bot]' || github.actor == 'dependabot-preview[bot]'
with:
github-token: "${{ secrets.GITHUB_TOKEN }}"

View File

@@ -1,10 +0,0 @@
name: AutoAssigner
on: [pull_request]
jobs:
assignAuthor:
runs-on: ubuntu-latest
steps:
- uses: samspills/assign-pr-to-author@v1.0
if: github.event_name == 'pull_request' && github.event.action == 'opened'
with:
repo-token: '${{ secrets.GITHUB_TOKEN }}'

View File

@@ -1,3 +1,4 @@
---
name: Go
on:
push:
@@ -9,12 +10,12 @@ jobs:
name: Build and Test
strategy:
matrix:
go-version: [1.12.x, 1.13.x, 1.14.x]
platform: [ubuntu-latest, macos-latest, windows-latest]
go-version: [1.12.x, 1.13.x, 1.14.x, 1.15.x]
platform: [ubuntu-latest, macos-latest]
runs-on: ${{ matrix.platform }}
steps:
- name: Setup Go ${{ matrix.go-version }}
uses: actions/setup-go@v1
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
id: go

View File

@@ -1,42 +0,0 @@
name: Greetings
on: [pull_request, issues]
jobs:
greeting:
runs-on: ubuntu-latest
steps:
- uses: actions/first-interaction@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
issue-message: |-
Hello!
Welcome to the Bitcask project! Someone will respond to your issue pretty quickly,
the author is pretty responsive 😀 In the meantime; please make sure you have read
the [Contributing](https://github.com/prologic/bitcask/blob/master/CONTRIBUTING.md)
and [Code of Conduct](https://github.com/prologic/bitcask/blob/master/CODE_OF_CONDUCT.md)
documents.
If possible please also make sure your Bug Report or Feature Request is clearly defined
with either examples or a reproducible case (_if a bug_).
Thank you 😃
pr-message: |-
Hello!
Welcome to the Bitcask project!
Thank you for your Pull Request and Contribution! We highly value all contributions to this Project!
Your Pull Request will be reviewed shortly! The author is pretty responsive 😀
In the meantime; please make sure you have read
the [Contributing](https://github.com/prologic/bitcask/blob/master/CONTRIBUTING.md)
and [Code of Conduct](https://github.com/prologic/bitcask/blob/master/CODE_OF_CONDUCT.md)
documents.
Please also ensure that your PR passes all the CI/CD tests -- They will appear in your PR
towards the bottom just above the comment box.
Also in addition, if you haven't already; please ammend your PR by modifying the
[AUTHORS](https://github.com/prologic/bitcask/blob/master/AUTHORS) file and adding
yourself to it! We like to recognize and peserve in Git history all contributors!
Thank you 😃

View File

@@ -1,9 +0,0 @@
name: Labeler
on: [pull_request]
jobs:
label:
runs-on: ubuntu-latest
steps:
- uses: actions/labeler@v2
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"

1
.gitignore vendored
View File

@@ -8,5 +8,6 @@
/bitcask
/bitcaskd
/bitcask_bench*
/cmd/bitcask/bitcask
/cmd/bitcaskd/bitcaskd

View File

@@ -1,4 +1,24 @@
<a name="v0.3.10"></a>
## [v0.3.10](https://github.com/prologic/bitcask/compare/v0.3.9...v0.3.10) (2020-12-18)
### Bug Fixes
* Fix a bug when MaxValueSize == 0 on Merge operations
* Fix link to bitcask-bench
* Fix CI (again)
* Fix CI
### Features
* Add support for unlimited key/value sizes
* Add a few more test cases for concurrent operations
### Updates
* Update README.md
<a name="v0.3.9"></a>
## [v0.3.9](https://github.com/prologic/bitcask/compare/v0.3.8...v0.3.9) (2020-11-17)
@@ -6,6 +26,10 @@
* Fix a race condition around .Close() and .Sync()
### Updates
* Update CHANGELOG for v0.3.9
<a name="v0.3.8"></a>
## [v0.3.8](https://github.com/prologic/bitcask/compare/v0.3.7...v0.3.8) (2020-11-17)

View File

@@ -194,6 +194,12 @@ You can find an [AUTHORS](/AUTHORS) file where we keep a list of contributors to
[![](https://sourcerer.io/fame/prologic/prologic/bitcask/images/6)](https://sourcerer.io/fame/prologic/prologic/bitcask/links/6)
[![](https://sourcerer.io/fame/prologic/prologic/bitcask/images/7)](https://sourcerer.io/fame/prologic/prologic/bitcask/links/7)
## Related Projects
- [bitraft](https://github.com/prologic/bitraft) -- A Distributed Key/Value store (_using Raft_) with a Redis compatible protocol.
- [bitcaskfs](https://github.com/prologic/bitcaskfs) -- A FUSE filesystem for mounting a Bitcask database.
- [bitcask-bench](https://github.com/prologic/bitcask-bench) -- A benchmarking tool comparing Bitcask and several other Go key/value libraries.
## License
bitcask is licensed under the term of the [MIT License](https://github.com/prologic/bitcask/blob/master/LICENSE)

View File

@@ -12,12 +12,14 @@ import (
"sort"
"sync"
"github.com/gofrs/flock"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/flock"
"github.com/prologic/bitcask/internal"
"github.com/prologic/bitcask/internal/config"
"github.com/prologic/bitcask/internal/data"
"github.com/prologic/bitcask/internal/data/codec"
"github.com/prologic/bitcask/internal/index"
"github.com/prologic/bitcask/internal/metadata"
)
var (
@@ -42,6 +44,10 @@ var (
// ErrDatabaseLocked is the error returned if the database is locked
// (typically opened by another process)
ErrDatabaseLocked = errors.New("error: database locked")
// ErrMergeInProgress is the error returned if merge is called when already a merge
// is in progress
ErrMergeInProgress = errors.New("error: merge already in progress")
)
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
@@ -59,6 +65,8 @@ type Bitcask struct {
datafiles map[int]data.Datafile
trie art.Tree
indexer index.Indexer
metadata *metadata.MetaData
isMerging bool
}
// Stats is a struct returned by Stats() on an open Bitcask instance
@@ -88,14 +96,19 @@ func (b *Bitcask) Stats() (stats Stats, err error) {
// database.
func (b *Bitcask) Close() error {
b.mu.RLock()
defer b.mu.RUnlock()
defer func() {
b.mu.RUnlock()
b.Flock.Unlock()
os.Remove(b.Flock.Path())
}()
return b.close()
}
if err := b.indexer.Save(b.trie, filepath.Join(b.path, "index")); err != nil {
func (b *Bitcask) close() error {
defer b.Flock.Unlock()
if err := b.saveIndex(); err != nil {
return err
}
b.metadata.IndexUpToDate = true
if err := b.saveMetadata(); err != nil {
return err
}
@@ -112,18 +125,29 @@ func (b *Bitcask) Close() error {
func (b *Bitcask) Sync() error {
b.mu.RLock()
defer b.mu.RUnlock()
if err := b.saveMetadata(); err != nil {
return err
}
return b.curr.Sync()
}
// Get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
// Get fetches value for given key
func (b *Bitcask) Get(key []byte) ([]byte, error) {
b.mu.RLock()
defer b.mu.RUnlock()
return b.get(key)
}
// get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
func (b *Bitcask) get(key []byte) ([]byte, error) {
var df data.Datafile
b.mu.RLock()
value, found := b.trie.Search(key)
if !found {
b.mu.RUnlock()
return nil, ErrKeyNotFound
}
@@ -136,7 +160,6 @@ func (b *Bitcask) Get(key []byte) ([]byte, error) {
}
e, err := df.ReadAt(item.Offset, item.Size)
b.mu.RUnlock()
if err != nil {
return nil, err
}
@@ -162,30 +185,35 @@ func (b *Bitcask) Put(key, value []byte) error {
if len(key) == 0 {
return ErrEmptyKey
}
if uint32(len(key)) > b.config.MaxKeySize {
if b.config.MaxKeySize > 0 && uint32(len(key)) > b.config.MaxKeySize {
return ErrKeyTooLarge
}
if uint64(len(value)) > b.config.MaxValueSize {
if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize {
return ErrValueTooLarge
}
b.mu.Lock()
defer b.mu.Unlock()
offset, n, err := b.put(key, value)
if err != nil {
b.mu.Unlock()
return err
}
if b.config.Sync {
if err := b.curr.Sync(); err != nil {
b.mu.Unlock()
return err
}
}
// in case of successful `put`, IndexUpToDate will be always be false
b.metadata.IndexUpToDate = false
if oldItem, found := b.trie.Search(key); found {
b.metadata.ReclaimableSpace += oldItem.(internal.Item).Size
}
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
b.trie.Insert(key, item)
b.mu.Unlock()
return nil
}
@@ -199,6 +227,9 @@ func (b *Bitcask) Delete(key []byte) error {
b.mu.Unlock()
return err
}
if item, found := b.trie.Search(key); found {
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(key))
}
b.trie.Delete(key)
b.mu.Unlock()
@@ -212,7 +243,12 @@ func (b *Bitcask) DeleteAll() (err error) {
b.trie.ForEach(func(node art.Node) bool {
_, _, err = b.put(node.Key(), []byte{})
return err == nil
if err != nil {
return false
}
item, _ := b.trie.Search(node.Key())
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(node.Key()))
return true
})
b.trie = art.New()
@@ -302,21 +338,58 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
return -1, 0, err
}
b.curr = curr
err = b.saveIndex()
if err != nil {
return -1, 0, err
}
}
e := internal.NewEntry(key, value)
return b.curr.Write(e)
}
// closeCurrentFile closes current datafile and makes it read only.
func (b *Bitcask) closeCurrentFile() error {
err := b.curr.Close()
if err != nil {
return err
}
id := b.curr.FileID()
df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
if err != nil {
return err
}
b.datafiles[id] = df
return nil
}
// openNewWritableFile opens new datafile for writing data
func (b *Bitcask) openNewWritableFile() error {
id := b.curr.FileID() + 1
curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
if err != nil {
return err
}
b.curr = curr
return nil
}
func (b *Bitcask) Reopen() error {
b.mu.Lock()
defer b.mu.Unlock()
return b.reopen()
}
// reopen reloads a bitcask object with index and datafiles
// caller of this method should take care of locking
func (b *Bitcask) reopen() error {
datafiles, lastID, err := loadDatafiles(b.path, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
if err != nil {
return err
}
t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles)
t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles, lastID, b.metadata.IndexUpToDate)
if err != nil {
return err
}
@@ -337,6 +410,34 @@ func (b *Bitcask) Reopen() error {
// and deleted keys removes. Duplicate key/value pairs are also removed.
// Call this function periodically to reclaim disk space.
func (b *Bitcask) Merge() error {
b.mu.Lock()
if b.isMerging {
b.mu.Unlock()
return ErrMergeInProgress
}
b.isMerging = true
b.mu.Unlock()
defer func() {
b.isMerging = false
}()
b.mu.RLock()
err := b.closeCurrentFile()
if err != nil {
b.mu.RUnlock()
return err
}
filesToMerge := make([]int, 0, len(b.datafiles))
for k := range b.datafiles {
filesToMerge = append(filesToMerge, k)
}
err = b.openNewWritableFile()
if err != nil {
b.mu.RUnlock()
return err
}
b.mu.RUnlock()
sort.Ints(filesToMerge)
// Temporary merged database path
temp, err := ioutil.TempDir(b.path, "merge")
if err != nil {
@@ -345,7 +446,7 @@ func (b *Bitcask) Merge() error {
defer os.RemoveAll(temp)
// Create a merged database
mdb, err := Open(temp, b.options...)
mdb, err := Open(temp, withConfig(b.config))
if err != nil {
return err
}
@@ -354,7 +455,12 @@ func (b *Bitcask) Merge() error {
// Doing this automatically strips deleted keys and
// old key/value pairs
err = b.Fold(func(key []byte) error {
value, err := b.Get(key)
item, _ := b.trie.Search(key)
// if key was updated after start of merge operation, nothing to do
if item.(internal.Item).FileID > filesToMerge[len(filesToMerge)-1] {
return nil
}
value, err := b.get(key)
if err != nil {
return err
}
@@ -368,29 +474,36 @@ func (b *Bitcask) Merge() error {
if err != nil {
return err
}
err = mdb.Close()
if err != nil {
if err = mdb.Close(); err != nil {
return err
}
// no reads and writes till we reopen
b.mu.Lock()
defer b.mu.Unlock()
if err = b.close(); err != nil {
return err
}
// Close the database
err = b.Close()
if err != nil {
return err
}
// Remove all data files
// Remove data files
files, err := ioutil.ReadDir(b.path)
if err != nil {
return err
}
for _, file := range files {
if !file.IsDir() {
err := os.RemoveAll(path.Join([]string{b.path, file.Name()}...))
if err != nil {
return err
}
if file.IsDir() {
continue
}
ids, err := internal.ParseIds([]string{file.Name()})
if err != nil {
return err
}
// if datafile was created after start of merge, skip
if len(ids) > 0 && ids[0] > filesToMerge[len(filesToMerge)-1] {
continue
}
err = os.RemoveAll(path.Join(b.path, file.Name()))
if err != nil {
return err
}
}
@@ -408,9 +521,10 @@ func (b *Bitcask) Merge() error {
return err
}
}
b.metadata.ReclaimableSpace = 0
// And finally reopen the database
return b.Reopen()
return b.reopen()
}
// Open opens the database at the given path with optional options.
@@ -418,8 +532,9 @@ func (b *Bitcask) Merge() error {
// configuration options as functions.
func Open(path string, options ...Option) (*Bitcask, error) {
var (
cfg *config.Config
err error
cfg *config.Config
err error
meta *metadata.MetaData
)
configPath := filepath.Join(path, "config.json")
@@ -442,12 +557,18 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return nil, err
}
meta, err = loadMetadata(path)
if err != nil {
return nil, err
}
bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, "lock")),
config: cfg,
options: options,
path: path,
indexer: index.NewIndexer(),
Flock: flock.New(filepath.Join(path, "lock")),
config: cfg,
options: options,
path: path,
indexer: index.NewIndexer(),
metadata: meta,
}
locked, err := bitcask.Flock.TryLock()
@@ -475,6 +596,36 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return bitcask, nil
}
// Backup copies db directory to given path
// it creates path if it does not exist
func (b *Bitcask) Backup(path string) error {
if !internal.Exists(path) {
if err := os.MkdirAll(path, b.config.DirFileModeBeforeUmask); err != nil {
return err
}
}
return internal.Copy(b.path, path, []string{"lock"})
}
// saveIndex saves index currently in RAM to disk
func (b *Bitcask) saveIndex() error {
tempIdx := "temp_index"
if err := b.indexer.Save(b.trie, filepath.Join(b.path, tempIdx)); err != nil {
return err
}
return os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index"))
}
// saveMetadata saves metadata into disk
func (b *Bitcask) saveMetadata() error {
return b.metadata.Save(filepath.Join(b.path, "meta.json"), b.config.DirFileModeBeforeUmask)
}
// Reclaimable returns space that can be reclaimed
func (b *Bitcask) Reclaimable() int64 {
return b.metadata.ReclaimableSpace
}
func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileModeBeforeUmask os.FileMode) (datafiles map[int]data.Datafile, lastID int, err error) {
fns, err := internal.GetDatafiles(path)
if err != nil {
@@ -513,34 +664,56 @@ func getSortedDatafiles(datafiles map[int]data.Datafile) []data.Datafile {
return out
}
func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile) (art.Tree, error) {
func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile, lastID int, indexUpToDate bool) (art.Tree, error) {
t, found, err := indexer.Load(filepath.Join(path, "index"), maxKeySize)
if err != nil {
return nil, err
}
if !found {
sortedDatafiles := getSortedDatafiles(datafiles)
for _, df := range sortedDatafiles {
var offset int64
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
t.Delete(e.Key)
offset += n
continue
}
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
t.Insert(e.Key, item)
offset += n
}
if found && indexUpToDate {
return t, nil
}
if found {
if err := loadIndexFromDatafile(t, datafiles[lastID]); err != nil {
return nil, err
}
return t, nil
}
sortedDatafiles := getSortedDatafiles(datafiles)
for _, df := range sortedDatafiles {
if err := loadIndexFromDatafile(t, df); err != nil {
return nil, err
}
}
return t, nil
}
func loadIndexFromDatafile(t art.Tree, df data.Datafile) error {
var offset int64
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
t.Delete(e.Key)
offset += n
continue
}
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
t.Insert(e.Key, item)
offset += n
}
return nil
}
func loadMetadata(path string) (*metadata.MetaData, error) {
if !internal.Exists(filepath.Join(path, "meta.json")) {
meta := new(metadata.MetaData)
return meta, nil
}
return metadata.Load(filepath.Join(path, "meta.json"))
}

View File

@@ -9,7 +9,6 @@ import (
"path"
"path/filepath"
"reflect"
"runtime"
"sort"
"strings"
"sync"
@@ -53,12 +52,6 @@ func SortByteArrays(src [][]byte) [][]byte {
return sorted
}
func skipIfWindows(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping this test on Windows")
}
}
func TestAll(t *testing.T) {
var (
db *Bitcask
@@ -136,6 +129,14 @@ func TestAll(t *testing.T) {
assert.NoError(err)
})
t.Run("Backup", func(t *testing.T) {
path, err := ioutil.TempDir("", "backup")
defer os.RemoveAll(path)
assert.NoError(err)
err = db.Backup(filepath.Join(path, "db-backup"))
assert.NoError(err)
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
@@ -306,6 +307,64 @@ func TestDeletedKeys(t *testing.T) {
})
}
func TestMetadata(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir)
assert.NoError(err)
err = db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
err = db.Close()
assert.NoError(err)
db, err = Open(testdir)
assert.NoError(err)
t.Run("IndexUptoDateAfterCloseAndOpen", func(t *testing.T) {
assert.Equal(true, db.metadata.IndexUpToDate)
})
t.Run("IndexUptoDateAfterPut", func(t *testing.T) {
assert.NoError(db.Put([]byte("foo1"), []byte("bar1")))
assert.Equal(false, db.metadata.IndexUpToDate)
})
t.Run("Reclaimable", func(t *testing.T) {
assert.Equal(int64(0), db.Reclaimable())
})
t.Run("ReclaimableAfterNewPut", func(t *testing.T) {
assert.NoError(db.Put([]byte("hello"), []byte("world")))
assert.Equal(int64(0), db.Reclaimable())
})
t.Run("ReclaimableAfterRepeatedPut", func(t *testing.T) {
assert.NoError(db.Put([]byte("hello"), []byte("world")))
assert.Equal(int64(26), db.Reclaimable())
})
t.Run("ReclaimableAfterDelete", func(t *testing.T) {
assert.NoError(db.Delete([]byte("hello")))
assert.Equal(int64(73), db.Reclaimable())
})
t.Run("ReclaimableAfterNonExistingDelete", func(t *testing.T) {
assert.NoError(db.Delete([]byte("hello1")))
assert.Equal(int64(73), db.Reclaimable())
})
t.Run("ReclaimableAfterDeleteAll", func(t *testing.T) {
assert.NoError(db.DeleteAll())
assert.Equal(int64(158), db.Reclaimable())
})
t.Run("ReclaimableAfterMerge", func(t *testing.T) {
assert.NoError(db.Merge())
assert.Equal(int64(0), db.Reclaimable())
})
t.Run("IndexUptoDateAfterMerge", func(t *testing.T) {
assert.Equal(true, db.metadata.IndexUpToDate)
})
t.Run("ReclaimableAfterMergeAndDeleteAll", func(t *testing.T) {
assert.NoError(db.DeleteAll())
assert.Equal(int64(0), db.Reclaimable())
})
}
func TestConfigErrors(t *testing.T) {
assert := assert.New(t)
@@ -337,9 +396,6 @@ func TestConfigErrors(t *testing.T) {
}
func TestAutoRecovery(t *testing.T) {
if runtime.GOOS == "windows" {
t.SkipNow()
}
withAutoRecovery := []bool{false, true}
for _, autoRecovery := range withAutoRecovery {
@@ -577,6 +633,11 @@ func TestSync(t *testing.T) {
value := []byte("foobar")
err = db.Put(key, value)
})
t.Run("Put", func(t *testing.T) {
err = db.Put([]byte("hello"), []byte("world"))
assert.NoError(err)
})
}
func TestMaxKeySize(t *testing.T) {
@@ -707,8 +768,6 @@ func TestStatsError(t *testing.T) {
})
t.Run("Test", func(t *testing.T) {
skipIfWindows(t)
t.Run("FabricatedDestruction", func(t *testing.T) {
// This would never happen in reality :D
// Or would it? :)
@@ -724,8 +783,6 @@ func TestStatsError(t *testing.T) {
}
func TestDirFileModeBeforeUmask(t *testing.T) {
skipIfWindows(t)
assert := assert.New(t)
t.Run("Setup", func(t *testing.T) {
@@ -808,8 +865,6 @@ func TestDirFileModeBeforeUmask(t *testing.T) {
}
func TestFileFileModeBeforeUmask(t *testing.T) {
skipIfWindows(t)
assert := assert.New(t)
t.Run("Setup", func(t *testing.T) {
@@ -984,7 +1039,7 @@ func TestMerge(t *testing.T) {
s3, err := db.Stats()
assert.NoError(err)
assert.Equal(1, s3.Datafiles)
assert.Equal(2, s3.Datafiles)
assert.Equal(1, s3.Keys)
assert.True(s3.Size > s1.Size)
assert.True(s3.Size < s2.Size)
@@ -1208,7 +1263,7 @@ func TestCloseErrors(t *testing.T) {
assert.NoError(err)
mockIndexer := new(mocks.Indexer)
mockIndexer.On("Save", db.trie, filepath.Join(db.path, "index")).Return(ErrMockError)
mockIndexer.On("Save", db.trie, filepath.Join(db.path, "temp_index")).Return(ErrMockError)
db.indexer = mockIndexer
err = db.Close()
@@ -1279,8 +1334,6 @@ func TestMergeErrors(t *testing.T) {
assert := assert.New(t)
t.Run("RemoveDatabaseDirectory", func(t *testing.T) {
skipIfWindows(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
@@ -1316,18 +1369,19 @@ func TestMergeErrors(t *testing.T) {
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir)
db, err := Open(testdir, WithMaxDatafileSize(22))
assert.NoError(err)
assert.NoError(db.Put([]byte("foo"), []byte("bar")))
assert.NoError(db.Put([]byte("bar"), []byte("baz")))
mockDatafile := new(mocks.Datafile)
mockDatafile.On("FileID").Return(0)
mockDatafile.On("Close").Return(nil)
mockDatafile.On("ReadAt", int64(0), int64(22)).Return(
internal.Entry{},
ErrMockError,
)
db.curr = mockDatafile
db.datafiles[0] = mockDatafile
err = db.Merge()
assert.Error(err)
@@ -1406,6 +1460,92 @@ func TestConcurrent(t *testing.T) {
wg.Wait()
})
// Test concurrent Put() with concurrent Scan()
t.Run("PutScan", func(t *testing.T) {
doPut := func(wg *sync.WaitGroup, x int) {
defer func() {
wg.Done()
}()
for i := 0; i <= 100; i++ {
if i%x == 0 {
key := []byte(fmt.Sprintf("k%d", i))
value := []byte(fmt.Sprintf("v%d", i))
err := db.Put(key, value)
assert.NoError(err)
}
}
}
doScan := func(wg *sync.WaitGroup, x int) {
defer func() {
wg.Done()
}()
for i := 0; i <= 100; i++ {
if i%x == 0 {
err := db.Scan([]byte("k"), func(key []byte) error {
return nil
})
assert.NoError(err)
}
}
}
wg := &sync.WaitGroup{}
wg.Add(6)
go doPut(wg, 2)
go doPut(wg, 3)
go doPut(wg, 5)
go doScan(wg, 1)
go doScan(wg, 2)
go doScan(wg, 4)
wg.Wait()
})
// XXX: This has data races
/* Test concurrent Scan() with concurrent Merge()
t.Run("ScanMerge", func(t *testing.T) {
doScan := func(wg *sync.WaitGroup, x int) {
defer func() {
wg.Done()
}()
for i := 0; i <= 100; i++ {
if i%x == 0 {
err := db.Scan([]byte("k"), func(key []byte) error {
return nil
})
assert.NoError(err)
}
}
}
doMerge := func(wg *sync.WaitGroup, x int) {
defer func() {
wg.Done()
}()
for i := 0; i <= 100; i++ {
if i%x == 0 {
err := db.Merge()
assert.NoError(err)
}
}
}
wg := &sync.WaitGroup{}
wg.Add(6)
go doScan(wg, 2)
go doScan(wg, 3)
go doScan(wg, 5)
go doMerge(wg, 1)
go doMerge(wg, 2)
go doMerge(wg, 4)
wg.Wait()
})
*/
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
@@ -1485,7 +1625,6 @@ func TestLocking(t *testing.T) {
_, err = Open(testdir)
assert.Error(err)
assert.Equal(ErrDatabaseLocked, err)
}
type benchmarkTestCase struct {

97
flock/flock.go Normal file
View File

@@ -0,0 +1,97 @@
package flock
import (
"errors"
"os"
"sync"
)
type Flock struct {
path string
m sync.Mutex
fh *os.File
}
var (
ErrAlreadyLocked = errors.New("Double lock: already own the lock")
ErrLockFailed = errors.New("Could not acquire lock")
ErrLockNotHeld = errors.New("Could not unlock, lock is not held")
ErrInodeChangedAtPath = errors.New("Inode changed at path")
)
// New returns a new instance of *Flock. The only parameter
// it takes is the path to the desired lockfile.
func New(path string) *Flock {
return &Flock{path: path}
}
// Path returns the file path linked to this lock.
func (f *Flock) Path() string {
return f.path
}
// Lock will acquire the lock. This function may block indefinitely if some other process holds the lock. For a non-blocking version, see Flock.TryLock().
func (f *Flock) Lock() error {
f.m.Lock()
defer f.m.Unlock()
if f.fh != nil {
return ErrAlreadyLocked
}
var fh *os.File
fh, err := lock_sys(f.path, false)
// treat "ErrInodeChangedAtPath" as "some other process holds the lock, retry locking"
for err == ErrInodeChangedAtPath {
fh, err = lock_sys(f.path, false)
}
if err != nil {
return err
}
if fh == nil {
return ErrLockFailed
}
f.fh = fh
return nil
}
// TryLock will try to acquire the lock, and returns immediately if the lock is already owned by another process.
func (f *Flock) TryLock() (bool, error) {
f.m.Lock()
defer f.m.Unlock()
if f.fh != nil {
return false, ErrAlreadyLocked
}
fh, err := lock_sys(f.path, true)
if err != nil {
return false, ErrLockFailed
}
f.fh = fh
return true, nil
}
// Unlock removes the lock file from disk and releases the lock.
// Whatever the result of `.Unlock()`, the caller must assume that it does not hold the lock anymore.
func (f *Flock) Unlock() error {
f.m.Lock()
defer f.m.Unlock()
if f.fh == nil {
return ErrLockNotHeld
}
err1 := rm_if_match(f.fh, f.path)
err2 := f.fh.Close()
if err1 != nil {
return err1
}
return err2
}

121
flock/flock_test.go Normal file
View File

@@ -0,0 +1,121 @@
package flock
import (
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// WARNING : this test will delete the file located at "testLockPath". Choose an adequate temporary file name.
const testLockPath = "/tmp/bitcask_unit_test_lock" // file path to use for the lock
func TestTryLock(t *testing.T) {
// test that basic locking functionnalities are consistent
// make sure there is no present lock when startng this test
os.Remove(testLockPath)
assert := assert.New(t)
lock1 := New(testLockPath)
lock2 := New(testLockPath)
// 1- take the first lock
locked1, err := lock1.TryLock()
assert.True(locked1)
assert.NoError(err)
// 2- check that the second lock cannot acquire the lock
locked2, err := lock2.TryLock()
assert.False(locked2)
assert.Error(err)
// 3- release the first lock
err = lock1.Unlock()
assert.NoError(err)
// 4- check that the second lock can acquire and then release the lock without error
locked2, err = lock2.TryLock()
assert.True(locked2)
assert.NoError(err)
err = lock2.Unlock()
assert.NoError(err)
}
func TestLock(t *testing.T) {
assert := assert.New(t)
// make sure there is no present lock when startng this test
os.Remove(testLockPath)
syncChan := make(chan bool)
// main goroutine: take lock on testPath
lock := New(testLockPath)
err := lock.Lock()
assert.NoError(err)
go func() {
// sub routine:
lock := New(testLockPath)
// before entering the block '.Lock()' call, signal we are about to do it
// see below : the main goroutine will wait for a small delay before releasing the lock
syncChan <- true
// '.Lock()' should ultimately return without error :
err := lock.Lock()
assert.NoError(err)
err = lock.Unlock()
assert.NoError(err)
close(syncChan)
}()
// wait for the "ready" signal from the sub routine,
<-syncChan
// after that signal wait for a small delay before releasing the lock
<-time.After(100 * time.Microsecond)
err = lock.Unlock()
assert.NoError(err)
// wait for the sub routine to finish
<-syncChan
}
func TestErrorConditions(t *testing.T) {
// error conditions implemented in this version :
// - you can't release a lock you do not hold
// - you can't lock twice the same lock
// -- setup
assert := assert.New(t)
// make sure there is no present lock when startng this test
os.Remove(testLockPath)
lock := New(testLockPath)
// -- run tests :
err := lock.Unlock()
assert.Error(err, "you can't release a lock you do not hold")
// take the lock once:
lock.TryLock()
locked, err := lock.TryLock()
assert.False(locked)
assert.Error(err, "you can't lock twice the same lock (using .TryLock())")
err = lock.Lock()
assert.Error(err, "you can't lock twice the same lock (using .Lock())")
// -- teardown
lock.Unlock()
}

79
flock/flock_unix.go Normal file
View File

@@ -0,0 +1,79 @@
// +build !aix,!windows
package flock
import (
"os"
"golang.org/x/sys/unix"
)
func lock_sys(path string, nonBlocking bool) (_ *os.File, err error) {
var fh *os.File
fh, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
fh.Close()
}
}()
flag := unix.LOCK_EX
if nonBlocking {
flag |= unix.LOCK_NB
}
err = unix.Flock(int(fh.Fd()), flag)
if err != nil {
return nil, err
}
if !sameInodes(fh, path) {
return nil, ErrInodeChangedAtPath
}
return fh, nil
}
func rm_if_match(fh *os.File, path string) error {
// Sanity check :
// before running "rm", check that the file pointed at by the
// filehandle has the same inode as the path in the filesystem
//
// If this sanity check doesn't pass, store a "ErrInodeChangedAtPath" error,
// if the check passes, run os.Remove, and store the error if any.
//
// note : this sanity check is in no way atomic, but :
// - as long as only cooperative processes are involved, it will work as intended
// - it allows to avoid 99.9% the major pitfall case: "root user forcefully removed the lockfile"
if !sameInodes(fh, path) {
return ErrInodeChangedAtPath
}
return os.Remove(path)
}
func sameInodes(f *os.File, path string) bool {
// get inode from opened file f:
var fstat unix.Stat_t
err := unix.Fstat(int(f.Fd()), &fstat)
if err != nil {
return false
}
fileIno := fstat.Ino
// get inode for path on disk:
var dstat unix.Stat_t
err = unix.Stat(path, &dstat)
if err != nil {
return false
}
pathIno := dstat.Ino
return pathIno == fileIno
}

236
flock/race_test.go Normal file
View File

@@ -0,0 +1,236 @@
package flock
// the "nd" in "nd_test.go" stands for non-deterministic
import (
"errors"
"os"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// The two tests in this file are test some concurrency scenarios :
// 1- TestRaceLock() runs several threads racing for the same lock
// 2- TestShatteredLock() runs racing racing threads, along with another threads which forcibly remove the file from disk
//
// Note that these tests are non-deterministic : the coverage produced by each test depends
// on how the runtime chooses to schedule the concurrent goroutines.
var lockerCount int64
// lockAndCount tries to take a lock on "lockpath"
// if it fails : it returns 0 and stop there
// if it succeeds :
// 1- it sets a defer function to release the lock in the same fashion as "func (b *Bitcask) Close()"
// 2- it increments the shared "lockerCount" above
// 3- it waits for a short amount of time
// 4- it decrements "lockerCount"
// 5- it returns the value it has seen at step 2.
//
// If the locking and unlocking behave as we expect them to,
// instructions 1-5 should be in a critical section,
// and the only possible value at step 2 should be "1".
//
// Returning a value > 0 indicates this function successfully acquired the lock,
// returning a value == 0 indicates that TryLock failed.
func lockAndCount(lockpath string) int64 {
lock := New(lockpath)
ok, _ := lock.TryLock()
if !ok {
return 0
}
defer func() {
lock.Unlock()
}()
x := atomic.AddInt64(&lockerCount, 1)
// emulate a workload :
<-time.After(1 * time.Microsecond)
atomic.AddInt64(&lockerCount, -1)
return x
}
// locker will call the lock function above in a loop, until one of the following holds :
// - reading from the "timeout" channel doesn't block
// - the number of calls to "lock()" that indicate the lock was successfully taken reaches "successfullLockCount"
func locker(t *testing.T, id int, lockPath string, successfulLockCount int, timeout <-chan struct{}) {
timedOut := false
failCount := 0
max := int64(0)
lockloop:
for successfulLockCount > 0 {
select {
case <-timeout:
timedOut = true
break lockloop
default:
}
x := lockAndCount(lockPath)
if x > 0 {
// if x indicates the lock was taken : decrement the counter
successfulLockCount--
}
if x > 1 {
// if x indicates an invalid value : increase the failCount and update max accordingly
failCount++
if x > max {
max = x
}
}
}
// check failure cases :
if timedOut {
t.Fail()
t.Logf("[runner %02d] timed out", id)
}
if failCount > 0 {
t.Fail()
t.Logf("[runner %02d] lockCounter was > 1 on %2.d occasions, max seen value was %2.d", id, failCount, max)
}
}
// TestRaceLock checks that no error occurs when several concurrent actors (goroutines in this case) race for the same lock.
func TestRaceLock(t *testing.T) {
// test parameters, written in code :
// you may want to tweak these values for testing
goroutines := 20 // number of concurrent "locker" goroutines to launch
successfulLockCount := 50 // how many times a "locker" will successfully take the lock before halting
// make sure there is no present lock when startng this test
os.Remove(testLockPath)
// timeout implemented in code
// (the lock acquisition depends on the behavior of the filesystem,
// avoid sending CI in endless loop if something fishy happens on the test server ...)
// tweak this value if needed ; comment out the "close(ch)" instruction below
timeout := 10 * time.Second
ch := make(chan struct{})
go func() {
<-time.After(timeout)
close(ch)
}()
wg := &sync.WaitGroup{}
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(id int) {
locker(t, id, testLockPath, successfulLockCount, ch)
wg.Done()
}(i)
}
wg.Wait()
}
func isExpectedError(err error) bool {
switch {
case err == nil:
return true
case err == ErrInodeChangedAtPath:
return true
case errors.Is(err, syscall.ENOENT):
return true
default:
return false
}
}
// TestShatteredLock runs concurrent goroutines on one lock, with an extra goroutine
// which removes the lock file from disk without checking the locks
// (e.g: a user who would run 'rm lockfile' in a loop while the program is running).
//
// In this scenario, errors may occur on .Unlock() ; this test checks that only errors
// relating to the file being deleted occur.
//
// This test additionally logs the number of errors that occurred, grouped by error message.
func TestShatteredLock(t *testing.T) {
// test parameters, written in code :
// you may want to tweak these values for testing
goroutines := 4 // number of concurrent "locker" and "remover" goroutines to launch
successfulLockCount := 10 // how many times a "locker" will successfully take the lock before halting
// make sure there is no present lock when startng this test
os.Remove(testLockPath)
assert := assert.New(t)
wg := &sync.WaitGroup{}
wg.Add(goroutines)
stopChan := make(chan struct{})
errChan := make(chan error, 10)
for i := 0; i < goroutines; i++ {
go func(id int, count int) {
for count > 0 {
lock := New(testLockPath)
ok, _ := lock.TryLock()
if !ok {
continue
}
count--
err := lock.Unlock()
if !isExpectedError(err) {
assert.Fail("goroutine %d - unexpected error: %v", id, err)
}
if err != nil {
errChan <- err
}
}
wg.Done()
}(i, successfulLockCount)
}
var wgCompanion = &sync.WaitGroup{}
wgCompanion.Add(2)
go func() {
defer wgCompanion.Done()
for {
os.Remove(testLockPath)
select {
case <-stopChan:
return
default:
}
}
}()
var errs = make(map[string]int)
go func() {
for err := range errChan {
errs[err.Error()]++
}
wgCompanion.Done()
}()
wg.Wait()
close(stopChan)
close(errChan)
wgCompanion.Wait()
for err, count := range errs {
t.Logf(" seen %d times: %s", count, err)
}
}

5
go.mod
View File

@@ -3,7 +3,6 @@ module github.com/prologic/bitcask
go 1.13
require (
github.com/gofrs/flock v0.8.0
github.com/pelletier/go-toml v1.6.0 // indirect
github.com/pkg/errors v0.9.1
github.com/plar/go-adaptive-radix-tree v1.0.4
@@ -18,7 +17,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/tidwall/redcon v1.4.0
golang.org/x/exp v0.0.0-20200228211341-fcea875c7e85
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527
gopkg.in/ini.v1 v1.53.0 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
)

19
go.sum
View File

@@ -50,8 +50,6 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gofrs/flock v0.8.0 h1:MSdYClljsF3PbENUUEx85nkWfJSGfzYI9yEBZOJz6CY=
github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
@@ -117,7 +115,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@@ -139,7 +136,6 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.6.0 h1:aetoXYr0Tv7xRU/V4B4IZJ2QcbtMUFoNb3ORp7TzIK4=
github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys=
@@ -176,35 +172,28 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.7 h1:FfTH+vuMXOas8jmfb5/M7dzEYx7LpcLb7a0LPe34uOU=
github.com/spf13/cobra v0.0.7/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@@ -294,7 +283,6 @@ golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 h1:uYVVQ9WP/Ds2ROhcaGPeIdVq0RIXVLwsHlnvJ+cT1So=
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
@@ -348,19 +336,16 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.53.0 h1:c7ruDvTQi0MUTFuNpDRXLSjs7xT4TerM1icIg4uKWRg=
gopkg.in/ini.v1 v1.53.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@@ -74,7 +74,7 @@ func getKeyValueSizes(buf []byte, maxKeySize uint32, maxValueSize uint64) (uint3
actualKeySize := binary.BigEndian.Uint32(buf[:keySize])
actualValueSize := binary.BigEndian.Uint64(buf[keySize:])
if actualKeySize > maxKeySize || actualValueSize > maxValueSize || actualKeySize == 0 {
if (maxKeySize > 0 && actualKeySize > maxKeySize) || (maxValueSize > 0 && actualValueSize > maxValueSize) || actualKeySize == 0 {
return 0, 0, errInvalidKeyOrValueSize
}

View File

@@ -13,6 +13,7 @@ const (
keySize = 4
valueSize = 8
checksumSize = 4
MetaInfoSize = keySize + valueSize + checksumSize
)
// NewEncoder creates a streaming Entry encoder.

View File

@@ -34,7 +34,7 @@ func readKeyBytes(r io.Reader, maxKeySize uint32) ([]byte, error) {
return nil, errors.Wrap(errTruncatedKeySize, err.Error())
}
size := binary.BigEndian.Uint32(s)
if size > uint32(maxKeySize) {
if maxKeySize > 0 && size > uint32(maxKeySize) {
return nil, errKeySizeTooLarge
}

View File

@@ -0,0 +1,22 @@
package metadata
import (
"os"
"github.com/prologic/bitcask/internal"
)
type MetaData struct {
IndexUpToDate bool `json:"index_up_to_date"`
ReclaimableSpace int64 `json:"reclaimable_space"`
}
func (m *MetaData) Save(path string, mode os.FileMode) error {
return internal.SaveJsonToFile(m, path, mode)
}
func Load(path string) (*MetaData, error) {
var m MetaData
err := internal.LoadFromJsonFile(path, &m)
return &m, err
}

View File

@@ -1,7 +1,9 @@
package internal
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
@@ -63,3 +65,48 @@ func ParseIds(fns []string) ([]int, error) {
sort.Ints(ids)
return ids, nil
}
// Copy copies source contents to destination
func Copy(src, dst string, exclude []string) error {
return filepath.Walk(src, func(path string, info os.FileInfo, err error) error {
relPath := strings.Replace(path, src, "", 1)
if relPath == "" {
return nil
}
for _, e := range exclude {
matched, err := filepath.Match(e, info.Name())
if err != nil {
return err
}
if matched {
return nil
}
}
if info.IsDir() {
return os.Mkdir(filepath.Join(dst, relPath), info.Mode())
}
var data, err1 = ioutil.ReadFile(filepath.Join(src, relPath))
if err1 != nil {
return err1
}
return ioutil.WriteFile(filepath.Join(dst, relPath), data, info.Mode())
})
}
// SaveJsonToFile converts v into json and store in file identified by path
func SaveJsonToFile(v interface{}, path string, mode os.FileMode) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
return ioutil.WriteFile(path, b, mode)
}
// LoadFromJsonFile reads file located at `path` and put its content in json format in v
func LoadFromJsonFile(path string, v interface{}) error {
b, err := ioutil.ReadFile(path)
if err != nil {
return err
}
return json.Unmarshal(b, v)
}

108
internal/utils_test.go Normal file
View File

@@ -0,0 +1,108 @@
package internal
import (
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_Copy(t *testing.T) {
assert := assert.New(t)
t.Run("CopyDir", func(t *testing.T) {
tempsrc, err := ioutil.TempDir("", "test")
assert.NoError(err)
defer os.RemoveAll(tempsrc)
var f *os.File
tempdir, err := ioutil.TempDir(tempsrc, "")
assert.NoError(err)
f, err = os.OpenFile(filepath.Join(tempsrc, "file1"), os.O_WRONLY|os.O_CREATE, 0755)
assert.NoError(err)
n, err := f.WriteString("test123")
assert.Equal(7, n)
assert.NoError(err)
f.Close()
f, err = os.OpenFile(filepath.Join(tempsrc, "file2"), os.O_WRONLY|os.O_CREATE, 0755)
assert.NoError(err)
n, err = f.WriteString("test1234")
assert.Equal(8, n)
assert.NoError(err)
f.Close()
f, err = os.OpenFile(filepath.Join(tempsrc, "file3"), os.O_WRONLY|os.O_CREATE, 0755)
assert.NoError(err)
f.Close()
tempdst, err := ioutil.TempDir("", "backup")
assert.NoError(err)
defer os.RemoveAll(tempdst)
err = Copy(tempsrc, tempdst, []string{"file3"})
assert.NoError(err)
buf := make([]byte, 10)
exists := Exists(filepath.Join(tempdst, filepath.Base(tempdir)))
assert.Equal(true, exists)
f, err = os.Open(filepath.Join(tempdst, "file1"))
assert.NoError(err)
n, err = f.Read(buf[:7])
assert.NoError(err)
assert.Equal(7, n)
assert.Equal([]byte("test123"), buf[:7])
_, err = f.Read(buf)
assert.Equal(io.EOF, err)
f.Close()
f, err = os.Open(filepath.Join(tempdst, "file2"))
assert.NoError(err)
n, err = f.Read(buf[:8])
assert.NoError(err)
assert.Equal(8, n)
assert.Equal([]byte("test1234"), buf[:8])
_, err = f.Read(buf)
assert.Equal(io.EOF, err)
f.Close()
exists = Exists(filepath.Join(tempdst, "file3"))
assert.Equal(false, exists)
})
}
func Test_SaveAndLoad(t *testing.T) {
assert := assert.New(t)
t.Run("save and load", func(t *testing.T) {
tempdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(tempdir)
type test struct {
Value bool `json:"value"`
}
m := test{Value: true}
err = SaveJsonToFile(&m, filepath.Join(tempdir, "meta.json"), 0755)
assert.NoError(err)
m1 := test{}
err = LoadFromJsonFile(filepath.Join(tempdir, "meta.json"), &m1)
assert.NoError(err)
assert.Equal(m, m1)
})
t.Run("save and load error", func(t *testing.T) {
tempdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(tempdir)
type test struct {
Value bool `json:"value"`
}
err = SaveJsonToFile(make(chan int), filepath.Join(tempdir, "meta.json"), 0755)
assert.Error(err)
m1 := test{}
err = LoadFromJsonFile(filepath.Join(tempdir, "meta.json"), &m1)
assert.Error(err)
})
}

View File

@@ -31,6 +31,19 @@ const (
// Option is a function that takes a config struct and modifies it
type Option func(*config.Config) error
func withConfig(src *config.Config) Option {
return func(cfg *config.Config) error {
cfg.MaxDatafileSize = src.MaxDatafileSize
cfg.MaxKeySize = src.MaxKeySize
cfg.MaxValueSize = src.MaxValueSize
cfg.Sync = src.Sync
cfg.AutoRecovery = src.AutoRecovery
cfg.DirFileModeBeforeUmask = src.DirFileModeBeforeUmask
cfg.FileFileModeBeforeUmask = src.FileFileModeBeforeUmask
return nil
}
}
// WithAutoRecovery sets auto recovery of data and index file recreation.
// IMPORTANT: This flag MUST BE used only if a proper backup was made of all
// the existing datafiles.