mirror of
https://github.com/gogrlx/bitcask.git
synced 2026-04-02 11:09:01 -07:00
Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6e423ae179 | |||
| 8a60b5a370 | |||
|
|
29e1cf648b | ||
|
|
3a6235ea03 | ||
| 4e7414e920 | |||
| a6ed0bc12f | |||
| 0ab7d79246 | |||
|
|
38156e8461 | ||
|
|
e1cdffd8f1 | ||
| 80c06a3572 | |||
| 9172eb0f90 | |||
| c09ce153e9 | |||
| d3428bac8c | |||
|
|
158f6d9888 | ||
|
|
f4357e6f18 | ||
| 5e01d6d098 |
11
.github/workflows/auto-approve.yml
vendored
11
.github/workflows/auto-approve.yml
vendored
@@ -1,11 +0,0 @@
|
||||
name: Auto approve
|
||||
on: [pull_request]
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: hmarr/auto-approve-action@v2.0.0
|
||||
if: github.actor == 'dependabot[bot]' || github.actor == 'dependabot-preview[bot]'
|
||||
with:
|
||||
github-token: "${{ secrets.GITHUB_TOKEN }}"
|
||||
|
||||
10
.github/workflows/autoassign.yml
vendored
10
.github/workflows/autoassign.yml
vendored
@@ -1,10 +0,0 @@
|
||||
name: AutoAssigner
|
||||
on: [pull_request]
|
||||
jobs:
|
||||
assignAuthor:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: samspills/assign-pr-to-author@v1.0
|
||||
if: github.event_name == 'pull_request' && github.event.action == 'opened'
|
||||
with:
|
||||
repo-token: '${{ secrets.GITHUB_TOKEN }}'
|
||||
7
.github/workflows/go.yml
vendored
7
.github/workflows/go.yml
vendored
@@ -1,3 +1,4 @@
|
||||
---
|
||||
name: Go
|
||||
on:
|
||||
push:
|
||||
@@ -9,12 +10,12 @@ jobs:
|
||||
name: Build and Test
|
||||
strategy:
|
||||
matrix:
|
||||
go-version: [1.12.x, 1.13.x, 1.14.x]
|
||||
platform: [ubuntu-latest, macos-latest, windows-latest]
|
||||
go-version: [1.12.x, 1.13.x, 1.14.x, 1.15.x]
|
||||
platform: [ubuntu-latest, macos-latest]
|
||||
runs-on: ${{ matrix.platform }}
|
||||
steps:
|
||||
- name: Setup Go ${{ matrix.go-version }}
|
||||
uses: actions/setup-go@v1
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ${{ matrix.go-version }}
|
||||
id: go
|
||||
|
||||
42
.github/workflows/greetings.yml
vendored
42
.github/workflows/greetings.yml
vendored
@@ -1,42 +0,0 @@
|
||||
name: Greetings
|
||||
on: [pull_request, issues]
|
||||
jobs:
|
||||
greeting:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/first-interaction@v1
|
||||
with:
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
issue-message: |-
|
||||
Hello!
|
||||
|
||||
Welcome to the Bitcask project! Someone will respond to your issue pretty quickly,
|
||||
the author is pretty responsive 😀 In the meantime; please make sure you have read
|
||||
the [Contributing](https://github.com/prologic/bitcask/blob/master/CONTRIBUTING.md)
|
||||
and [Code of Conduct](https://github.com/prologic/bitcask/blob/master/CODE_OF_CONDUCT.md)
|
||||
documents.
|
||||
|
||||
If possible please also make sure your Bug Report or Feature Request is clearly defined
|
||||
with either examples or a reproducible case (_if a bug_).
|
||||
|
||||
Thank you 😃
|
||||
pr-message: |-
|
||||
Hello!
|
||||
|
||||
Welcome to the Bitcask project!
|
||||
|
||||
Thank you for your Pull Request and Contribution! We highly value all contributions to this Project!
|
||||
Your Pull Request will be reviewed shortly! The author is pretty responsive 😀
|
||||
In the meantime; please make sure you have read
|
||||
the [Contributing](https://github.com/prologic/bitcask/blob/master/CONTRIBUTING.md)
|
||||
and [Code of Conduct](https://github.com/prologic/bitcask/blob/master/CODE_OF_CONDUCT.md)
|
||||
documents.
|
||||
|
||||
Please also ensure that your PR passes all the CI/CD tests -- They will appear in your PR
|
||||
towards the bottom just above the comment box.
|
||||
|
||||
Also in addition, if you haven't already; please ammend your PR by modifying the
|
||||
[AUTHORS](https://github.com/prologic/bitcask/blob/master/AUTHORS) file and adding
|
||||
yourself to it! We like to recognize and peserve in Git history all contributors!
|
||||
|
||||
Thank you 😃
|
||||
9
.github/workflows/label.yml
vendored
9
.github/workflows/label.yml
vendored
@@ -1,9 +0,0 @@
|
||||
name: Labeler
|
||||
on: [pull_request]
|
||||
jobs:
|
||||
label:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/labeler@v2
|
||||
with:
|
||||
repo-token: "${{ secrets.GITHUB_TOKEN }}"
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -8,5 +8,6 @@
|
||||
|
||||
/bitcask
|
||||
/bitcaskd
|
||||
/bitcask_bench*
|
||||
/cmd/bitcask/bitcask
|
||||
/cmd/bitcaskd/bitcaskd
|
||||
|
||||
24
CHANGELOG.md
24
CHANGELOG.md
@@ -1,4 +1,24 @@
|
||||
|
||||
<a name="v0.3.10"></a>
|
||||
## [v0.3.10](https://github.com/prologic/bitcask/compare/v0.3.9...v0.3.10) (2020-12-18)
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* Fix a bug when MaxValueSize == 0 on Merge operations
|
||||
* Fix link to bitcask-bench
|
||||
* Fix CI (again)
|
||||
* Fix CI
|
||||
|
||||
### Features
|
||||
|
||||
* Add support for unlimited key/value sizes
|
||||
* Add a few more test cases for concurrent operations
|
||||
|
||||
### Updates
|
||||
|
||||
* Update README.md
|
||||
|
||||
|
||||
<a name="v0.3.9"></a>
|
||||
## [v0.3.9](https://github.com/prologic/bitcask/compare/v0.3.8...v0.3.9) (2020-11-17)
|
||||
|
||||
@@ -6,6 +26,10 @@
|
||||
|
||||
* Fix a race condition around .Close() and .Sync()
|
||||
|
||||
### Updates
|
||||
|
||||
* Update CHANGELOG for v0.3.9
|
||||
|
||||
|
||||
<a name="v0.3.8"></a>
|
||||
## [v0.3.8](https://github.com/prologic/bitcask/compare/v0.3.7...v0.3.8) (2020-11-17)
|
||||
|
||||
@@ -194,6 +194,12 @@ You can find an [AUTHORS](/AUTHORS) file where we keep a list of contributors to
|
||||
[](https://sourcerer.io/fame/prologic/prologic/bitcask/links/6)
|
||||
[](https://sourcerer.io/fame/prologic/prologic/bitcask/links/7)
|
||||
|
||||
## Related Projects
|
||||
|
||||
- [bitraft](https://github.com/prologic/bitraft) -- A Distributed Key/Value store (_using Raft_) with a Redis compatible protocol.
|
||||
- [bitcaskfs](https://github.com/prologic/bitcaskfs) -- A FUSE filesystem for mounting a Bitcask database.
|
||||
- [bitcask-bench](https://github.com/prologic/bitcask-bench) -- A benchmarking tool comparing Bitcask and several other Go key/value libraries.
|
||||
|
||||
## License
|
||||
|
||||
bitcask is licensed under the term of the [MIT License](https://github.com/prologic/bitcask/blob/master/LICENSE)
|
||||
|
||||
307
bitcask.go
307
bitcask.go
@@ -12,12 +12,14 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/gofrs/flock"
|
||||
art "github.com/plar/go-adaptive-radix-tree"
|
||||
"github.com/prologic/bitcask/flock"
|
||||
"github.com/prologic/bitcask/internal"
|
||||
"github.com/prologic/bitcask/internal/config"
|
||||
"github.com/prologic/bitcask/internal/data"
|
||||
"github.com/prologic/bitcask/internal/data/codec"
|
||||
"github.com/prologic/bitcask/internal/index"
|
||||
"github.com/prologic/bitcask/internal/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -42,6 +44,10 @@ var (
|
||||
// ErrDatabaseLocked is the error returned if the database is locked
|
||||
// (typically opened by another process)
|
||||
ErrDatabaseLocked = errors.New("error: database locked")
|
||||
|
||||
// ErrMergeInProgress is the error returned if merge is called when already a merge
|
||||
// is in progress
|
||||
ErrMergeInProgress = errors.New("error: merge already in progress")
|
||||
)
|
||||
|
||||
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
|
||||
@@ -59,6 +65,8 @@ type Bitcask struct {
|
||||
datafiles map[int]data.Datafile
|
||||
trie art.Tree
|
||||
indexer index.Indexer
|
||||
metadata *metadata.MetaData
|
||||
isMerging bool
|
||||
}
|
||||
|
||||
// Stats is a struct returned by Stats() on an open Bitcask instance
|
||||
@@ -88,14 +96,19 @@ func (b *Bitcask) Stats() (stats Stats, err error) {
|
||||
// database.
|
||||
func (b *Bitcask) Close() error {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
|
||||
defer func() {
|
||||
b.mu.RUnlock()
|
||||
b.Flock.Unlock()
|
||||
os.Remove(b.Flock.Path())
|
||||
}()
|
||||
return b.close()
|
||||
}
|
||||
|
||||
if err := b.indexer.Save(b.trie, filepath.Join(b.path, "index")); err != nil {
|
||||
func (b *Bitcask) close() error {
|
||||
defer b.Flock.Unlock()
|
||||
if err := b.saveIndex(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.metadata.IndexUpToDate = true
|
||||
if err := b.saveMetadata(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -112,18 +125,29 @@ func (b *Bitcask) Close() error {
|
||||
func (b *Bitcask) Sync() error {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
|
||||
if err := b.saveMetadata(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.curr.Sync()
|
||||
}
|
||||
|
||||
// Get retrieves the value of the given key. If the key is not found or an/I/O
|
||||
// error occurs a null byte slice is returned along with the error.
|
||||
// Get fetches value for given key
|
||||
func (b *Bitcask) Get(key []byte) ([]byte, error) {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
|
||||
return b.get(key)
|
||||
}
|
||||
|
||||
// get retrieves the value of the given key. If the key is not found or an/I/O
|
||||
// error occurs a null byte slice is returned along with the error.
|
||||
func (b *Bitcask) get(key []byte) ([]byte, error) {
|
||||
var df data.Datafile
|
||||
|
||||
b.mu.RLock()
|
||||
value, found := b.trie.Search(key)
|
||||
if !found {
|
||||
b.mu.RUnlock()
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
|
||||
@@ -136,7 +160,6 @@ func (b *Bitcask) Get(key []byte) ([]byte, error) {
|
||||
}
|
||||
|
||||
e, err := df.ReadAt(item.Offset, item.Size)
|
||||
b.mu.RUnlock()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -162,30 +185,35 @@ func (b *Bitcask) Put(key, value []byte) error {
|
||||
if len(key) == 0 {
|
||||
return ErrEmptyKey
|
||||
}
|
||||
if uint32(len(key)) > b.config.MaxKeySize {
|
||||
if b.config.MaxKeySize > 0 && uint32(len(key)) > b.config.MaxKeySize {
|
||||
return ErrKeyTooLarge
|
||||
}
|
||||
if uint64(len(value)) > b.config.MaxValueSize {
|
||||
if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize {
|
||||
return ErrValueTooLarge
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
offset, n, err := b.put(key, value)
|
||||
if err != nil {
|
||||
b.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
if b.config.Sync {
|
||||
if err := b.curr.Sync(); err != nil {
|
||||
b.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// in case of successful `put`, IndexUpToDate will be always be false
|
||||
b.metadata.IndexUpToDate = false
|
||||
|
||||
if oldItem, found := b.trie.Search(key); found {
|
||||
b.metadata.ReclaimableSpace += oldItem.(internal.Item).Size
|
||||
}
|
||||
|
||||
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
|
||||
b.trie.Insert(key, item)
|
||||
b.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -199,6 +227,9 @@ func (b *Bitcask) Delete(key []byte) error {
|
||||
b.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
if item, found := b.trie.Search(key); found {
|
||||
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(key))
|
||||
}
|
||||
b.trie.Delete(key)
|
||||
b.mu.Unlock()
|
||||
|
||||
@@ -212,7 +243,12 @@ func (b *Bitcask) DeleteAll() (err error) {
|
||||
|
||||
b.trie.ForEach(func(node art.Node) bool {
|
||||
_, _, err = b.put(node.Key(), []byte{})
|
||||
return err == nil
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
item, _ := b.trie.Search(node.Key())
|
||||
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(node.Key()))
|
||||
return true
|
||||
})
|
||||
b.trie = art.New()
|
||||
|
||||
@@ -302,21 +338,58 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
|
||||
return -1, 0, err
|
||||
}
|
||||
b.curr = curr
|
||||
err = b.saveIndex()
|
||||
if err != nil {
|
||||
return -1, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
e := internal.NewEntry(key, value)
|
||||
return b.curr.Write(e)
|
||||
}
|
||||
|
||||
// closeCurrentFile closes current datafile and makes it read only.
|
||||
func (b *Bitcask) closeCurrentFile() error {
|
||||
err := b.curr.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
id := b.curr.FileID()
|
||||
df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.datafiles[id] = df
|
||||
return nil
|
||||
}
|
||||
|
||||
// openNewWritableFile opens new datafile for writing data
|
||||
func (b *Bitcask) openNewWritableFile() error {
|
||||
id := b.curr.FileID() + 1
|
||||
curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.curr = curr
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Bitcask) Reopen() error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
return b.reopen()
|
||||
}
|
||||
|
||||
// reopen reloads a bitcask object with index and datafiles
|
||||
// caller of this method should take care of locking
|
||||
func (b *Bitcask) reopen() error {
|
||||
datafiles, lastID, err := loadDatafiles(b.path, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles)
|
||||
t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles, lastID, b.metadata.IndexUpToDate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -337,6 +410,34 @@ func (b *Bitcask) Reopen() error {
|
||||
// and deleted keys removes. Duplicate key/value pairs are also removed.
|
||||
// Call this function periodically to reclaim disk space.
|
||||
func (b *Bitcask) Merge() error {
|
||||
b.mu.Lock()
|
||||
if b.isMerging {
|
||||
b.mu.Unlock()
|
||||
return ErrMergeInProgress
|
||||
}
|
||||
b.isMerging = true
|
||||
b.mu.Unlock()
|
||||
defer func() {
|
||||
b.isMerging = false
|
||||
}()
|
||||
b.mu.RLock()
|
||||
err := b.closeCurrentFile()
|
||||
if err != nil {
|
||||
b.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
filesToMerge := make([]int, 0, len(b.datafiles))
|
||||
for k := range b.datafiles {
|
||||
filesToMerge = append(filesToMerge, k)
|
||||
}
|
||||
err = b.openNewWritableFile()
|
||||
if err != nil {
|
||||
b.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
b.mu.RUnlock()
|
||||
sort.Ints(filesToMerge)
|
||||
|
||||
// Temporary merged database path
|
||||
temp, err := ioutil.TempDir(b.path, "merge")
|
||||
if err != nil {
|
||||
@@ -345,7 +446,7 @@ func (b *Bitcask) Merge() error {
|
||||
defer os.RemoveAll(temp)
|
||||
|
||||
// Create a merged database
|
||||
mdb, err := Open(temp, b.options...)
|
||||
mdb, err := Open(temp, withConfig(b.config))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -354,7 +455,12 @@ func (b *Bitcask) Merge() error {
|
||||
// Doing this automatically strips deleted keys and
|
||||
// old key/value pairs
|
||||
err = b.Fold(func(key []byte) error {
|
||||
value, err := b.Get(key)
|
||||
item, _ := b.trie.Search(key)
|
||||
// if key was updated after start of merge operation, nothing to do
|
||||
if item.(internal.Item).FileID > filesToMerge[len(filesToMerge)-1] {
|
||||
return nil
|
||||
}
|
||||
value, err := b.get(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -368,29 +474,36 @@ func (b *Bitcask) Merge() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = mdb.Close()
|
||||
if err != nil {
|
||||
if err = mdb.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
// no reads and writes till we reopen
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if err = b.close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Close the database
|
||||
err = b.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove all data files
|
||||
// Remove data files
|
||||
files, err := ioutil.ReadDir(b.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, file := range files {
|
||||
if !file.IsDir() {
|
||||
err := os.RemoveAll(path.Join([]string{b.path, file.Name()}...))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if file.IsDir() {
|
||||
continue
|
||||
}
|
||||
ids, err := internal.ParseIds([]string{file.Name()})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// if datafile was created after start of merge, skip
|
||||
if len(ids) > 0 && ids[0] > filesToMerge[len(filesToMerge)-1] {
|
||||
continue
|
||||
}
|
||||
err = os.RemoveAll(path.Join(b.path, file.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -408,9 +521,10 @@ func (b *Bitcask) Merge() error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
b.metadata.ReclaimableSpace = 0
|
||||
|
||||
// And finally reopen the database
|
||||
return b.Reopen()
|
||||
return b.reopen()
|
||||
}
|
||||
|
||||
// Open opens the database at the given path with optional options.
|
||||
@@ -418,8 +532,9 @@ func (b *Bitcask) Merge() error {
|
||||
// configuration options as functions.
|
||||
func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
var (
|
||||
cfg *config.Config
|
||||
err error
|
||||
cfg *config.Config
|
||||
err error
|
||||
meta *metadata.MetaData
|
||||
)
|
||||
|
||||
configPath := filepath.Join(path, "config.json")
|
||||
@@ -442,12 +557,18 @@ func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meta, err = loadMetadata(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bitcask := &Bitcask{
|
||||
Flock: flock.New(filepath.Join(path, "lock")),
|
||||
config: cfg,
|
||||
options: options,
|
||||
path: path,
|
||||
indexer: index.NewIndexer(),
|
||||
Flock: flock.New(filepath.Join(path, "lock")),
|
||||
config: cfg,
|
||||
options: options,
|
||||
path: path,
|
||||
indexer: index.NewIndexer(),
|
||||
metadata: meta,
|
||||
}
|
||||
|
||||
locked, err := bitcask.Flock.TryLock()
|
||||
@@ -475,6 +596,36 @@ func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
return bitcask, nil
|
||||
}
|
||||
|
||||
// Backup copies db directory to given path
|
||||
// it creates path if it does not exist
|
||||
func (b *Bitcask) Backup(path string) error {
|
||||
if !internal.Exists(path) {
|
||||
if err := os.MkdirAll(path, b.config.DirFileModeBeforeUmask); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return internal.Copy(b.path, path, []string{"lock"})
|
||||
}
|
||||
|
||||
// saveIndex saves index currently in RAM to disk
|
||||
func (b *Bitcask) saveIndex() error {
|
||||
tempIdx := "temp_index"
|
||||
if err := b.indexer.Save(b.trie, filepath.Join(b.path, tempIdx)); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index"))
|
||||
}
|
||||
|
||||
// saveMetadata saves metadata into disk
|
||||
func (b *Bitcask) saveMetadata() error {
|
||||
return b.metadata.Save(filepath.Join(b.path, "meta.json"), b.config.DirFileModeBeforeUmask)
|
||||
}
|
||||
|
||||
// Reclaimable returns space that can be reclaimed
|
||||
func (b *Bitcask) Reclaimable() int64 {
|
||||
return b.metadata.ReclaimableSpace
|
||||
}
|
||||
|
||||
func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileModeBeforeUmask os.FileMode) (datafiles map[int]data.Datafile, lastID int, err error) {
|
||||
fns, err := internal.GetDatafiles(path)
|
||||
if err != nil {
|
||||
@@ -513,34 +664,56 @@ func getSortedDatafiles(datafiles map[int]data.Datafile) []data.Datafile {
|
||||
return out
|
||||
}
|
||||
|
||||
func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile) (art.Tree, error) {
|
||||
func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile, lastID int, indexUpToDate bool) (art.Tree, error) {
|
||||
t, found, err := indexer.Load(filepath.Join(path, "index"), maxKeySize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !found {
|
||||
sortedDatafiles := getSortedDatafiles(datafiles)
|
||||
for _, df := range sortedDatafiles {
|
||||
var offset int64
|
||||
for {
|
||||
e, n, err := df.Read()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
// Tombstone value (deleted key)
|
||||
if len(e.Value) == 0 {
|
||||
t.Delete(e.Key)
|
||||
offset += n
|
||||
continue
|
||||
}
|
||||
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
|
||||
t.Insert(e.Key, item)
|
||||
offset += n
|
||||
}
|
||||
if found && indexUpToDate {
|
||||
return t, nil
|
||||
}
|
||||
if found {
|
||||
if err := loadIndexFromDatafile(t, datafiles[lastID]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
sortedDatafiles := getSortedDatafiles(datafiles)
|
||||
for _, df := range sortedDatafiles {
|
||||
if err := loadIndexFromDatafile(t, df); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func loadIndexFromDatafile(t art.Tree, df data.Datafile) error {
|
||||
var offset int64
|
||||
for {
|
||||
e, n, err := df.Read()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
// Tombstone value (deleted key)
|
||||
if len(e.Value) == 0 {
|
||||
t.Delete(e.Key)
|
||||
offset += n
|
||||
continue
|
||||
}
|
||||
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
|
||||
t.Insert(e.Key, item)
|
||||
offset += n
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadMetadata(path string) (*metadata.MetaData, error) {
|
||||
if !internal.Exists(filepath.Join(path, "meta.json")) {
|
||||
meta := new(metadata.MetaData)
|
||||
return meta, nil
|
||||
}
|
||||
return metadata.Load(filepath.Join(path, "meta.json"))
|
||||
}
|
||||
|
||||
187
bitcask_test.go
187
bitcask_test.go
@@ -9,7 +9,6 @@ import (
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -53,12 +52,6 @@ func SortByteArrays(src [][]byte) [][]byte {
|
||||
return sorted
|
||||
}
|
||||
|
||||
func skipIfWindows(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("Skipping this test on Windows")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAll(t *testing.T) {
|
||||
var (
|
||||
db *Bitcask
|
||||
@@ -136,6 +129,14 @@ func TestAll(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Backup", func(t *testing.T) {
|
||||
path, err := ioutil.TempDir("", "backup")
|
||||
defer os.RemoveAll(path)
|
||||
assert.NoError(err)
|
||||
err = db.Backup(filepath.Join(path, "db-backup"))
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Close", func(t *testing.T) {
|
||||
err = db.Close()
|
||||
assert.NoError(err)
|
||||
@@ -306,6 +307,64 @@ func TestDeletedKeys(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestMetadata(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
defer os.RemoveAll(testdir)
|
||||
|
||||
db, err := Open(testdir)
|
||||
assert.NoError(err)
|
||||
err = db.Put([]byte("foo"), []byte("bar"))
|
||||
assert.NoError(err)
|
||||
err = db.Close()
|
||||
assert.NoError(err)
|
||||
db, err = Open(testdir)
|
||||
assert.NoError(err)
|
||||
|
||||
t.Run("IndexUptoDateAfterCloseAndOpen", func(t *testing.T) {
|
||||
assert.Equal(true, db.metadata.IndexUpToDate)
|
||||
})
|
||||
t.Run("IndexUptoDateAfterPut", func(t *testing.T) {
|
||||
assert.NoError(db.Put([]byte("foo1"), []byte("bar1")))
|
||||
assert.Equal(false, db.metadata.IndexUpToDate)
|
||||
})
|
||||
t.Run("Reclaimable", func(t *testing.T) {
|
||||
assert.Equal(int64(0), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterNewPut", func(t *testing.T) {
|
||||
assert.NoError(db.Put([]byte("hello"), []byte("world")))
|
||||
assert.Equal(int64(0), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterRepeatedPut", func(t *testing.T) {
|
||||
assert.NoError(db.Put([]byte("hello"), []byte("world")))
|
||||
assert.Equal(int64(26), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterDelete", func(t *testing.T) {
|
||||
assert.NoError(db.Delete([]byte("hello")))
|
||||
assert.Equal(int64(73), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterNonExistingDelete", func(t *testing.T) {
|
||||
assert.NoError(db.Delete([]byte("hello1")))
|
||||
assert.Equal(int64(73), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterDeleteAll", func(t *testing.T) {
|
||||
assert.NoError(db.DeleteAll())
|
||||
assert.Equal(int64(158), db.Reclaimable())
|
||||
})
|
||||
t.Run("ReclaimableAfterMerge", func(t *testing.T) {
|
||||
assert.NoError(db.Merge())
|
||||
assert.Equal(int64(0), db.Reclaimable())
|
||||
})
|
||||
t.Run("IndexUptoDateAfterMerge", func(t *testing.T) {
|
||||
assert.Equal(true, db.metadata.IndexUpToDate)
|
||||
})
|
||||
t.Run("ReclaimableAfterMergeAndDeleteAll", func(t *testing.T) {
|
||||
assert.NoError(db.DeleteAll())
|
||||
assert.Equal(int64(0), db.Reclaimable())
|
||||
})
|
||||
}
|
||||
|
||||
func TestConfigErrors(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
@@ -337,9 +396,6 @@ func TestConfigErrors(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAutoRecovery(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.SkipNow()
|
||||
}
|
||||
withAutoRecovery := []bool{false, true}
|
||||
|
||||
for _, autoRecovery := range withAutoRecovery {
|
||||
@@ -577,6 +633,11 @@ func TestSync(t *testing.T) {
|
||||
value := []byte("foobar")
|
||||
err = db.Put(key, value)
|
||||
})
|
||||
|
||||
t.Run("Put", func(t *testing.T) {
|
||||
err = db.Put([]byte("hello"), []byte("world"))
|
||||
assert.NoError(err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMaxKeySize(t *testing.T) {
|
||||
@@ -707,8 +768,6 @@ func TestStatsError(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Test", func(t *testing.T) {
|
||||
skipIfWindows(t)
|
||||
|
||||
t.Run("FabricatedDestruction", func(t *testing.T) {
|
||||
// This would never happen in reality :D
|
||||
// Or would it? :)
|
||||
@@ -724,8 +783,6 @@ func TestStatsError(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDirFileModeBeforeUmask(t *testing.T) {
|
||||
skipIfWindows(t)
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
t.Run("Setup", func(t *testing.T) {
|
||||
@@ -808,8 +865,6 @@ func TestDirFileModeBeforeUmask(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFileFileModeBeforeUmask(t *testing.T) {
|
||||
skipIfWindows(t)
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
t.Run("Setup", func(t *testing.T) {
|
||||
@@ -984,7 +1039,7 @@ func TestMerge(t *testing.T) {
|
||||
|
||||
s3, err := db.Stats()
|
||||
assert.NoError(err)
|
||||
assert.Equal(1, s3.Datafiles)
|
||||
assert.Equal(2, s3.Datafiles)
|
||||
assert.Equal(1, s3.Keys)
|
||||
assert.True(s3.Size > s1.Size)
|
||||
assert.True(s3.Size < s2.Size)
|
||||
@@ -1208,7 +1263,7 @@ func TestCloseErrors(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
|
||||
mockIndexer := new(mocks.Indexer)
|
||||
mockIndexer.On("Save", db.trie, filepath.Join(db.path, "index")).Return(ErrMockError)
|
||||
mockIndexer.On("Save", db.trie, filepath.Join(db.path, "temp_index")).Return(ErrMockError)
|
||||
db.indexer = mockIndexer
|
||||
|
||||
err = db.Close()
|
||||
@@ -1279,8 +1334,6 @@ func TestMergeErrors(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
t.Run("RemoveDatabaseDirectory", func(t *testing.T) {
|
||||
skipIfWindows(t)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
defer os.RemoveAll(testdir)
|
||||
@@ -1316,18 +1369,19 @@ func TestMergeErrors(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
defer os.RemoveAll(testdir)
|
||||
|
||||
db, err := Open(testdir)
|
||||
db, err := Open(testdir, WithMaxDatafileSize(22))
|
||||
assert.NoError(err)
|
||||
|
||||
assert.NoError(db.Put([]byte("foo"), []byte("bar")))
|
||||
assert.NoError(db.Put([]byte("bar"), []byte("baz")))
|
||||
|
||||
mockDatafile := new(mocks.Datafile)
|
||||
mockDatafile.On("FileID").Return(0)
|
||||
mockDatafile.On("Close").Return(nil)
|
||||
mockDatafile.On("ReadAt", int64(0), int64(22)).Return(
|
||||
internal.Entry{},
|
||||
ErrMockError,
|
||||
)
|
||||
db.curr = mockDatafile
|
||||
db.datafiles[0] = mockDatafile
|
||||
|
||||
err = db.Merge()
|
||||
assert.Error(err)
|
||||
@@ -1406,6 +1460,92 @@ func TestConcurrent(t *testing.T) {
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
// Test concurrent Put() with concurrent Scan()
|
||||
t.Run("PutScan", func(t *testing.T) {
|
||||
doPut := func(wg *sync.WaitGroup, x int) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
}()
|
||||
for i := 0; i <= 100; i++ {
|
||||
if i%x == 0 {
|
||||
key := []byte(fmt.Sprintf("k%d", i))
|
||||
value := []byte(fmt.Sprintf("v%d", i))
|
||||
err := db.Put(key, value)
|
||||
assert.NoError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
doScan := func(wg *sync.WaitGroup, x int) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
}()
|
||||
for i := 0; i <= 100; i++ {
|
||||
if i%x == 0 {
|
||||
err := db.Scan([]byte("k"), func(key []byte) error {
|
||||
return nil
|
||||
})
|
||||
assert.NoError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(6)
|
||||
|
||||
go doPut(wg, 2)
|
||||
go doPut(wg, 3)
|
||||
go doPut(wg, 5)
|
||||
go doScan(wg, 1)
|
||||
go doScan(wg, 2)
|
||||
go doScan(wg, 4)
|
||||
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
// XXX: This has data races
|
||||
/* Test concurrent Scan() with concurrent Merge()
|
||||
t.Run("ScanMerge", func(t *testing.T) {
|
||||
doScan := func(wg *sync.WaitGroup, x int) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
}()
|
||||
for i := 0; i <= 100; i++ {
|
||||
if i%x == 0 {
|
||||
err := db.Scan([]byte("k"), func(key []byte) error {
|
||||
return nil
|
||||
})
|
||||
assert.NoError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
doMerge := func(wg *sync.WaitGroup, x int) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
}()
|
||||
for i := 0; i <= 100; i++ {
|
||||
if i%x == 0 {
|
||||
err := db.Merge()
|
||||
assert.NoError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(6)
|
||||
|
||||
go doScan(wg, 2)
|
||||
go doScan(wg, 3)
|
||||
go doScan(wg, 5)
|
||||
go doMerge(wg, 1)
|
||||
go doMerge(wg, 2)
|
||||
go doMerge(wg, 4)
|
||||
|
||||
wg.Wait()
|
||||
})
|
||||
*/
|
||||
|
||||
t.Run("Close", func(t *testing.T) {
|
||||
err = db.Close()
|
||||
assert.NoError(err)
|
||||
@@ -1485,7 +1625,6 @@ func TestLocking(t *testing.T) {
|
||||
|
||||
_, err = Open(testdir)
|
||||
assert.Error(err)
|
||||
assert.Equal(ErrDatabaseLocked, err)
|
||||
}
|
||||
|
||||
type benchmarkTestCase struct {
|
||||
|
||||
97
flock/flock.go
Normal file
97
flock/flock.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package flock
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Flock struct {
|
||||
path string
|
||||
m sync.Mutex
|
||||
fh *os.File
|
||||
}
|
||||
|
||||
var (
|
||||
ErrAlreadyLocked = errors.New("Double lock: already own the lock")
|
||||
ErrLockFailed = errors.New("Could not acquire lock")
|
||||
ErrLockNotHeld = errors.New("Could not unlock, lock is not held")
|
||||
|
||||
ErrInodeChangedAtPath = errors.New("Inode changed at path")
|
||||
)
|
||||
|
||||
// New returns a new instance of *Flock. The only parameter
|
||||
// it takes is the path to the desired lockfile.
|
||||
func New(path string) *Flock {
|
||||
return &Flock{path: path}
|
||||
}
|
||||
|
||||
// Path returns the file path linked to this lock.
|
||||
func (f *Flock) Path() string {
|
||||
return f.path
|
||||
}
|
||||
|
||||
// Lock will acquire the lock. This function may block indefinitely if some other process holds the lock. For a non-blocking version, see Flock.TryLock().
|
||||
func (f *Flock) Lock() error {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
|
||||
if f.fh != nil {
|
||||
return ErrAlreadyLocked
|
||||
}
|
||||
|
||||
var fh *os.File
|
||||
|
||||
fh, err := lock_sys(f.path, false)
|
||||
// treat "ErrInodeChangedAtPath" as "some other process holds the lock, retry locking"
|
||||
for err == ErrInodeChangedAtPath {
|
||||
fh, err = lock_sys(f.path, false)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fh == nil {
|
||||
return ErrLockFailed
|
||||
}
|
||||
|
||||
f.fh = fh
|
||||
return nil
|
||||
}
|
||||
|
||||
// TryLock will try to acquire the lock, and returns immediately if the lock is already owned by another process.
|
||||
func (f *Flock) TryLock() (bool, error) {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
|
||||
if f.fh != nil {
|
||||
return false, ErrAlreadyLocked
|
||||
}
|
||||
|
||||
fh, err := lock_sys(f.path, true)
|
||||
if err != nil {
|
||||
return false, ErrLockFailed
|
||||
}
|
||||
|
||||
f.fh = fh
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Unlock removes the lock file from disk and releases the lock.
|
||||
// Whatever the result of `.Unlock()`, the caller must assume that it does not hold the lock anymore.
|
||||
func (f *Flock) Unlock() error {
|
||||
f.m.Lock()
|
||||
defer f.m.Unlock()
|
||||
|
||||
if f.fh == nil {
|
||||
return ErrLockNotHeld
|
||||
}
|
||||
|
||||
err1 := rm_if_match(f.fh, f.path)
|
||||
err2 := f.fh.Close()
|
||||
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
return err2
|
||||
}
|
||||
121
flock/flock_test.go
Normal file
121
flock/flock_test.go
Normal file
@@ -0,0 +1,121 @@
|
||||
package flock
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// WARNING : this test will delete the file located at "testLockPath". Choose an adequate temporary file name.
|
||||
const testLockPath = "/tmp/bitcask_unit_test_lock" // file path to use for the lock
|
||||
|
||||
func TestTryLock(t *testing.T) {
|
||||
// test that basic locking functionnalities are consistent
|
||||
|
||||
// make sure there is no present lock when startng this test
|
||||
os.Remove(testLockPath)
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
lock1 := New(testLockPath)
|
||||
lock2 := New(testLockPath)
|
||||
|
||||
// 1- take the first lock
|
||||
locked1, err := lock1.TryLock()
|
||||
assert.True(locked1)
|
||||
assert.NoError(err)
|
||||
|
||||
// 2- check that the second lock cannot acquire the lock
|
||||
locked2, err := lock2.TryLock()
|
||||
assert.False(locked2)
|
||||
assert.Error(err)
|
||||
|
||||
// 3- release the first lock
|
||||
err = lock1.Unlock()
|
||||
assert.NoError(err)
|
||||
|
||||
// 4- check that the second lock can acquire and then release the lock without error
|
||||
locked2, err = lock2.TryLock()
|
||||
assert.True(locked2)
|
||||
assert.NoError(err)
|
||||
|
||||
err = lock2.Unlock()
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
func TestLock(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
// make sure there is no present lock when startng this test
|
||||
os.Remove(testLockPath)
|
||||
|
||||
syncChan := make(chan bool)
|
||||
|
||||
// main goroutine: take lock on testPath
|
||||
lock := New(testLockPath)
|
||||
|
||||
err := lock.Lock()
|
||||
assert.NoError(err)
|
||||
|
||||
go func() {
|
||||
// sub routine:
|
||||
lock := New(testLockPath)
|
||||
|
||||
// before entering the block '.Lock()' call, signal we are about to do it
|
||||
// see below : the main goroutine will wait for a small delay before releasing the lock
|
||||
syncChan <- true
|
||||
// '.Lock()' should ultimately return without error :
|
||||
err := lock.Lock()
|
||||
assert.NoError(err)
|
||||
|
||||
err = lock.Unlock()
|
||||
assert.NoError(err)
|
||||
|
||||
close(syncChan)
|
||||
}()
|
||||
|
||||
// wait for the "ready" signal from the sub routine,
|
||||
<-syncChan
|
||||
|
||||
// after that signal wait for a small delay before releasing the lock
|
||||
<-time.After(100 * time.Microsecond)
|
||||
err = lock.Unlock()
|
||||
assert.NoError(err)
|
||||
|
||||
// wait for the sub routine to finish
|
||||
<-syncChan
|
||||
}
|
||||
|
||||
func TestErrorConditions(t *testing.T) {
|
||||
// error conditions implemented in this version :
|
||||
// - you can't release a lock you do not hold
|
||||
// - you can't lock twice the same lock
|
||||
|
||||
// -- setup
|
||||
assert := assert.New(t)
|
||||
|
||||
// make sure there is no present lock when startng this test
|
||||
os.Remove(testLockPath)
|
||||
|
||||
lock := New(testLockPath)
|
||||
|
||||
// -- run tests :
|
||||
|
||||
err := lock.Unlock()
|
||||
assert.Error(err, "you can't release a lock you do not hold")
|
||||
|
||||
// take the lock once:
|
||||
lock.TryLock()
|
||||
|
||||
locked, err := lock.TryLock()
|
||||
assert.False(locked)
|
||||
assert.Error(err, "you can't lock twice the same lock (using .TryLock())")
|
||||
|
||||
err = lock.Lock()
|
||||
assert.Error(err, "you can't lock twice the same lock (using .Lock())")
|
||||
|
||||
// -- teardown
|
||||
lock.Unlock()
|
||||
}
|
||||
79
flock/flock_unix.go
Normal file
79
flock/flock_unix.go
Normal file
@@ -0,0 +1,79 @@
|
||||
// +build !aix,!windows
|
||||
|
||||
package flock
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func lock_sys(path string, nonBlocking bool) (_ *os.File, err error) {
|
||||
var fh *os.File
|
||||
|
||||
fh, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
fh.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
flag := unix.LOCK_EX
|
||||
if nonBlocking {
|
||||
flag |= unix.LOCK_NB
|
||||
}
|
||||
|
||||
err = unix.Flock(int(fh.Fd()), flag)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !sameInodes(fh, path) {
|
||||
return nil, ErrInodeChangedAtPath
|
||||
}
|
||||
|
||||
return fh, nil
|
||||
}
|
||||
|
||||
func rm_if_match(fh *os.File, path string) error {
|
||||
// Sanity check :
|
||||
// before running "rm", check that the file pointed at by the
|
||||
// filehandle has the same inode as the path in the filesystem
|
||||
//
|
||||
// If this sanity check doesn't pass, store a "ErrInodeChangedAtPath" error,
|
||||
// if the check passes, run os.Remove, and store the error if any.
|
||||
//
|
||||
// note : this sanity check is in no way atomic, but :
|
||||
// - as long as only cooperative processes are involved, it will work as intended
|
||||
// - it allows to avoid 99.9% the major pitfall case: "root user forcefully removed the lockfile"
|
||||
|
||||
if !sameInodes(fh, path) {
|
||||
return ErrInodeChangedAtPath
|
||||
}
|
||||
|
||||
return os.Remove(path)
|
||||
}
|
||||
|
||||
func sameInodes(f *os.File, path string) bool {
|
||||
// get inode from opened file f:
|
||||
var fstat unix.Stat_t
|
||||
err := unix.Fstat(int(f.Fd()), &fstat)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
fileIno := fstat.Ino
|
||||
|
||||
// get inode for path on disk:
|
||||
var dstat unix.Stat_t
|
||||
err = unix.Stat(path, &dstat)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
pathIno := dstat.Ino
|
||||
|
||||
return pathIno == fileIno
|
||||
}
|
||||
236
flock/race_test.go
Normal file
236
flock/race_test.go
Normal file
@@ -0,0 +1,236 @@
|
||||
package flock
|
||||
|
||||
// the "nd" in "nd_test.go" stands for non-deterministic
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// The two tests in this file are test some concurrency scenarios :
|
||||
// 1- TestRaceLock() runs several threads racing for the same lock
|
||||
// 2- TestShatteredLock() runs racing racing threads, along with another threads which forcibly remove the file from disk
|
||||
//
|
||||
// Note that these tests are non-deterministic : the coverage produced by each test depends
|
||||
// on how the runtime chooses to schedule the concurrent goroutines.
|
||||
|
||||
var lockerCount int64
|
||||
|
||||
// lockAndCount tries to take a lock on "lockpath"
|
||||
// if it fails : it returns 0 and stop there
|
||||
// if it succeeds :
|
||||
// 1- it sets a defer function to release the lock in the same fashion as "func (b *Bitcask) Close()"
|
||||
// 2- it increments the shared "lockerCount" above
|
||||
// 3- it waits for a short amount of time
|
||||
// 4- it decrements "lockerCount"
|
||||
// 5- it returns the value it has seen at step 2.
|
||||
//
|
||||
// If the locking and unlocking behave as we expect them to,
|
||||
// instructions 1-5 should be in a critical section,
|
||||
// and the only possible value at step 2 should be "1".
|
||||
//
|
||||
// Returning a value > 0 indicates this function successfully acquired the lock,
|
||||
// returning a value == 0 indicates that TryLock failed.
|
||||
|
||||
func lockAndCount(lockpath string) int64 {
|
||||
lock := New(lockpath)
|
||||
ok, _ := lock.TryLock()
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
defer func() {
|
||||
lock.Unlock()
|
||||
}()
|
||||
|
||||
x := atomic.AddInt64(&lockerCount, 1)
|
||||
// emulate a workload :
|
||||
<-time.After(1 * time.Microsecond)
|
||||
atomic.AddInt64(&lockerCount, -1)
|
||||
|
||||
return x
|
||||
}
|
||||
|
||||
// locker will call the lock function above in a loop, until one of the following holds :
|
||||
// - reading from the "timeout" channel doesn't block
|
||||
// - the number of calls to "lock()" that indicate the lock was successfully taken reaches "successfullLockCount"
|
||||
func locker(t *testing.T, id int, lockPath string, successfulLockCount int, timeout <-chan struct{}) {
|
||||
timedOut := false
|
||||
|
||||
failCount := 0
|
||||
max := int64(0)
|
||||
|
||||
lockloop:
|
||||
for successfulLockCount > 0 {
|
||||
select {
|
||||
case <-timeout:
|
||||
timedOut = true
|
||||
break lockloop
|
||||
default:
|
||||
}
|
||||
|
||||
x := lockAndCount(lockPath)
|
||||
|
||||
if x > 0 {
|
||||
// if x indicates the lock was taken : decrement the counter
|
||||
successfulLockCount--
|
||||
}
|
||||
|
||||
if x > 1 {
|
||||
// if x indicates an invalid value : increase the failCount and update max accordingly
|
||||
failCount++
|
||||
if x > max {
|
||||
max = x
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check failure cases :
|
||||
if timedOut {
|
||||
t.Fail()
|
||||
t.Logf("[runner %02d] timed out", id)
|
||||
}
|
||||
if failCount > 0 {
|
||||
t.Fail()
|
||||
t.Logf("[runner %02d] lockCounter was > 1 on %2.d occasions, max seen value was %2.d", id, failCount, max)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRaceLock checks that no error occurs when several concurrent actors (goroutines in this case) race for the same lock.
|
||||
func TestRaceLock(t *testing.T) {
|
||||
// test parameters, written in code :
|
||||
// you may want to tweak these values for testing
|
||||
|
||||
goroutines := 20 // number of concurrent "locker" goroutines to launch
|
||||
successfulLockCount := 50 // how many times a "locker" will successfully take the lock before halting
|
||||
|
||||
// make sure there is no present lock when startng this test
|
||||
os.Remove(testLockPath)
|
||||
|
||||
// timeout implemented in code
|
||||
// (the lock acquisition depends on the behavior of the filesystem,
|
||||
// avoid sending CI in endless loop if something fishy happens on the test server ...)
|
||||
// tweak this value if needed ; comment out the "close(ch)" instruction below
|
||||
timeout := 10 * time.Second
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
<-time.After(timeout)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(goroutines)
|
||||
|
||||
for i := 0; i < goroutines; i++ {
|
||||
go func(id int) {
|
||||
locker(t, id, testLockPath, successfulLockCount, ch)
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func isExpectedError(err error) bool {
|
||||
switch {
|
||||
case err == nil:
|
||||
return true
|
||||
case err == ErrInodeChangedAtPath:
|
||||
return true
|
||||
case errors.Is(err, syscall.ENOENT):
|
||||
return true
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// TestShatteredLock runs concurrent goroutines on one lock, with an extra goroutine
|
||||
// which removes the lock file from disk without checking the locks
|
||||
// (e.g: a user who would run 'rm lockfile' in a loop while the program is running).
|
||||
//
|
||||
// In this scenario, errors may occur on .Unlock() ; this test checks that only errors
|
||||
// relating to the file being deleted occur.
|
||||
//
|
||||
// This test additionally logs the number of errors that occurred, grouped by error message.
|
||||
func TestShatteredLock(t *testing.T) {
|
||||
// test parameters, written in code :
|
||||
// you may want to tweak these values for testing
|
||||
|
||||
goroutines := 4 // number of concurrent "locker" and "remover" goroutines to launch
|
||||
successfulLockCount := 10 // how many times a "locker" will successfully take the lock before halting
|
||||
|
||||
// make sure there is no present lock when startng this test
|
||||
os.Remove(testLockPath)
|
||||
assert := assert.New(t)
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(goroutines)
|
||||
|
||||
stopChan := make(chan struct{})
|
||||
|
||||
errChan := make(chan error, 10)
|
||||
|
||||
for i := 0; i < goroutines; i++ {
|
||||
go func(id int, count int) {
|
||||
for count > 0 {
|
||||
lock := New(testLockPath)
|
||||
ok, _ := lock.TryLock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
count--
|
||||
err := lock.Unlock()
|
||||
if !isExpectedError(err) {
|
||||
assert.Fail("goroutine %d - unexpected error: %v", id, err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
}(i, successfulLockCount)
|
||||
}
|
||||
|
||||
var wgCompanion = &sync.WaitGroup{}
|
||||
wgCompanion.Add(2)
|
||||
|
||||
go func() {
|
||||
defer wgCompanion.Done()
|
||||
for {
|
||||
os.Remove(testLockPath)
|
||||
|
||||
select {
|
||||
case <-stopChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var errs = make(map[string]int)
|
||||
go func() {
|
||||
for err := range errChan {
|
||||
errs[err.Error()]++
|
||||
}
|
||||
wgCompanion.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
close(stopChan)
|
||||
close(errChan)
|
||||
wgCompanion.Wait()
|
||||
|
||||
for err, count := range errs {
|
||||
t.Logf(" seen %d times: %s", count, err)
|
||||
}
|
||||
}
|
||||
5
go.mod
5
go.mod
@@ -3,7 +3,6 @@ module github.com/prologic/bitcask
|
||||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/gofrs/flock v0.8.0
|
||||
github.com/pelletier/go-toml v1.6.0 // indirect
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/plar/go-adaptive-radix-tree v1.0.4
|
||||
@@ -18,7 +17,7 @@ require (
|
||||
github.com/stretchr/testify v1.6.1
|
||||
github.com/tidwall/redcon v1.4.0
|
||||
golang.org/x/exp v0.0.0-20200228211341-fcea875c7e85
|
||||
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect
|
||||
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527
|
||||
gopkg.in/ini.v1 v1.53.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.2.8 // indirect
|
||||
gopkg.in/yaml.v2 v2.3.0 // indirect
|
||||
)
|
||||
|
||||
19
go.sum
19
go.sum
@@ -50,8 +50,6 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
|
||||
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
|
||||
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/gofrs/flock v0.8.0 h1:MSdYClljsF3PbENUUEx85nkWfJSGfzYI9yEBZOJz6CY=
|
||||
github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
|
||||
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
@@ -117,7 +115,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
|
||||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
|
||||
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
@@ -139,7 +136,6 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pelletier/go-toml v1.6.0 h1:aetoXYr0Tv7xRU/V4B4IZJ2QcbtMUFoNb3ORp7TzIK4=
|
||||
github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys=
|
||||
@@ -176,35 +172,28 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK
|
||||
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
|
||||
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
|
||||
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
|
||||
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
|
||||
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
|
||||
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||
github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
|
||||
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||
github.com/spf13/cobra v0.0.7 h1:FfTH+vuMXOas8jmfb5/M7dzEYx7LpcLb7a0LPe34uOU=
|
||||
github.com/spf13/cobra v0.0.7/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
|
||||
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
|
||||
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
|
||||
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
|
||||
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
|
||||
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
|
||||
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU=
|
||||
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
|
||||
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
|
||||
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
|
||||
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
@@ -294,7 +283,6 @@ golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 h1:uYVVQ9WP/Ds2ROhcaGPeIdVq0RIXVLwsHlnvJ+cT1So=
|
||||
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
||||
@@ -348,19 +336,16 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
|
||||
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/ini.v1 v1.53.0 h1:c7ruDvTQi0MUTFuNpDRXLSjs7xT4TerM1icIg4uKWRg=
|
||||
gopkg.in/ini.v1 v1.53.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
|
||||
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
|
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
|
||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
|
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
|
||||
@@ -74,7 +74,7 @@ func getKeyValueSizes(buf []byte, maxKeySize uint32, maxValueSize uint64) (uint3
|
||||
actualKeySize := binary.BigEndian.Uint32(buf[:keySize])
|
||||
actualValueSize := binary.BigEndian.Uint64(buf[keySize:])
|
||||
|
||||
if actualKeySize > maxKeySize || actualValueSize > maxValueSize || actualKeySize == 0 {
|
||||
if (maxKeySize > 0 && actualKeySize > maxKeySize) || (maxValueSize > 0 && actualValueSize > maxValueSize) || actualKeySize == 0 {
|
||||
|
||||
return 0, 0, errInvalidKeyOrValueSize
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ const (
|
||||
keySize = 4
|
||||
valueSize = 8
|
||||
checksumSize = 4
|
||||
MetaInfoSize = keySize + valueSize + checksumSize
|
||||
)
|
||||
|
||||
// NewEncoder creates a streaming Entry encoder.
|
||||
|
||||
@@ -34,7 +34,7 @@ func readKeyBytes(r io.Reader, maxKeySize uint32) ([]byte, error) {
|
||||
return nil, errors.Wrap(errTruncatedKeySize, err.Error())
|
||||
}
|
||||
size := binary.BigEndian.Uint32(s)
|
||||
if size > uint32(maxKeySize) {
|
||||
if maxKeySize > 0 && size > uint32(maxKeySize) {
|
||||
return nil, errKeySizeTooLarge
|
||||
}
|
||||
|
||||
|
||||
22
internal/metadata/metadata.go
Normal file
22
internal/metadata/metadata.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/prologic/bitcask/internal"
|
||||
)
|
||||
|
||||
type MetaData struct {
|
||||
IndexUpToDate bool `json:"index_up_to_date"`
|
||||
ReclaimableSpace int64 `json:"reclaimable_space"`
|
||||
}
|
||||
|
||||
func (m *MetaData) Save(path string, mode os.FileMode) error {
|
||||
return internal.SaveJsonToFile(m, path, mode)
|
||||
}
|
||||
|
||||
func Load(path string) (*MetaData, error) {
|
||||
var m MetaData
|
||||
err := internal.LoadFromJsonFile(path, &m)
|
||||
return &m, err
|
||||
}
|
||||
@@ -1,7 +1,9 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
@@ -63,3 +65,48 @@ func ParseIds(fns []string) ([]int, error) {
|
||||
sort.Ints(ids)
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
// Copy copies source contents to destination
|
||||
func Copy(src, dst string, exclude []string) error {
|
||||
return filepath.Walk(src, func(path string, info os.FileInfo, err error) error {
|
||||
relPath := strings.Replace(path, src, "", 1)
|
||||
if relPath == "" {
|
||||
return nil
|
||||
}
|
||||
for _, e := range exclude {
|
||||
matched, err := filepath.Match(e, info.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if matched {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if info.IsDir() {
|
||||
return os.Mkdir(filepath.Join(dst, relPath), info.Mode())
|
||||
}
|
||||
var data, err1 = ioutil.ReadFile(filepath.Join(src, relPath))
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
return ioutil.WriteFile(filepath.Join(dst, relPath), data, info.Mode())
|
||||
})
|
||||
}
|
||||
|
||||
// SaveJsonToFile converts v into json and store in file identified by path
|
||||
func SaveJsonToFile(v interface{}, path string, mode os.FileMode) error {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return ioutil.WriteFile(path, b, mode)
|
||||
}
|
||||
|
||||
// LoadFromJsonFile reads file located at `path` and put its content in json format in v
|
||||
func LoadFromJsonFile(path string, v interface{}) error {
|
||||
b, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return json.Unmarshal(b, v)
|
||||
}
|
||||
|
||||
108
internal/utils_test.go
Normal file
108
internal/utils_test.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_Copy(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
t.Run("CopyDir", func(t *testing.T) {
|
||||
tempsrc, err := ioutil.TempDir("", "test")
|
||||
assert.NoError(err)
|
||||
defer os.RemoveAll(tempsrc)
|
||||
var f *os.File
|
||||
|
||||
tempdir, err := ioutil.TempDir(tempsrc, "")
|
||||
assert.NoError(err)
|
||||
|
||||
f, err = os.OpenFile(filepath.Join(tempsrc, "file1"), os.O_WRONLY|os.O_CREATE, 0755)
|
||||
assert.NoError(err)
|
||||
n, err := f.WriteString("test123")
|
||||
assert.Equal(7, n)
|
||||
assert.NoError(err)
|
||||
f.Close()
|
||||
|
||||
f, err = os.OpenFile(filepath.Join(tempsrc, "file2"), os.O_WRONLY|os.O_CREATE, 0755)
|
||||
assert.NoError(err)
|
||||
n, err = f.WriteString("test1234")
|
||||
assert.Equal(8, n)
|
||||
assert.NoError(err)
|
||||
f.Close()
|
||||
|
||||
f, err = os.OpenFile(filepath.Join(tempsrc, "file3"), os.O_WRONLY|os.O_CREATE, 0755)
|
||||
assert.NoError(err)
|
||||
f.Close()
|
||||
|
||||
tempdst, err := ioutil.TempDir("", "backup")
|
||||
assert.NoError(err)
|
||||
defer os.RemoveAll(tempdst)
|
||||
err = Copy(tempsrc, tempdst, []string{"file3"})
|
||||
assert.NoError(err)
|
||||
buf := make([]byte, 10)
|
||||
|
||||
exists := Exists(filepath.Join(tempdst, filepath.Base(tempdir)))
|
||||
assert.Equal(true, exists)
|
||||
|
||||
f, err = os.Open(filepath.Join(tempdst, "file1"))
|
||||
assert.NoError(err)
|
||||
n, err = f.Read(buf[:7])
|
||||
assert.NoError(err)
|
||||
assert.Equal(7, n)
|
||||
assert.Equal([]byte("test123"), buf[:7])
|
||||
_, err = f.Read(buf)
|
||||
assert.Equal(io.EOF, err)
|
||||
f.Close()
|
||||
|
||||
f, err = os.Open(filepath.Join(tempdst, "file2"))
|
||||
assert.NoError(err)
|
||||
n, err = f.Read(buf[:8])
|
||||
assert.NoError(err)
|
||||
assert.Equal(8, n)
|
||||
assert.Equal([]byte("test1234"), buf[:8])
|
||||
_, err = f.Read(buf)
|
||||
assert.Equal(io.EOF, err)
|
||||
f.Close()
|
||||
|
||||
exists = Exists(filepath.Join(tempdst, "file3"))
|
||||
assert.Equal(false, exists)
|
||||
})
|
||||
}
|
||||
|
||||
func Test_SaveAndLoad(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
t.Run("save and load", func(t *testing.T) {
|
||||
tempdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
defer os.RemoveAll(tempdir)
|
||||
type test struct {
|
||||
Value bool `json:"value"`
|
||||
}
|
||||
m := test{Value: true}
|
||||
err = SaveJsonToFile(&m, filepath.Join(tempdir, "meta.json"), 0755)
|
||||
assert.NoError(err)
|
||||
m1 := test{}
|
||||
err = LoadFromJsonFile(filepath.Join(tempdir, "meta.json"), &m1)
|
||||
assert.NoError(err)
|
||||
assert.Equal(m, m1)
|
||||
})
|
||||
|
||||
t.Run("save and load error", func(t *testing.T) {
|
||||
tempdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
defer os.RemoveAll(tempdir)
|
||||
type test struct {
|
||||
Value bool `json:"value"`
|
||||
}
|
||||
err = SaveJsonToFile(make(chan int), filepath.Join(tempdir, "meta.json"), 0755)
|
||||
assert.Error(err)
|
||||
m1 := test{}
|
||||
err = LoadFromJsonFile(filepath.Join(tempdir, "meta.json"), &m1)
|
||||
assert.Error(err)
|
||||
})
|
||||
}
|
||||
13
options.go
13
options.go
@@ -31,6 +31,19 @@ const (
|
||||
// Option is a function that takes a config struct and modifies it
|
||||
type Option func(*config.Config) error
|
||||
|
||||
func withConfig(src *config.Config) Option {
|
||||
return func(cfg *config.Config) error {
|
||||
cfg.MaxDatafileSize = src.MaxDatafileSize
|
||||
cfg.MaxKeySize = src.MaxKeySize
|
||||
cfg.MaxValueSize = src.MaxValueSize
|
||||
cfg.Sync = src.Sync
|
||||
cfg.AutoRecovery = src.AutoRecovery
|
||||
cfg.DirFileModeBeforeUmask = src.DirFileModeBeforeUmask
|
||||
cfg.FileFileModeBeforeUmask = src.FileFileModeBeforeUmask
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithAutoRecovery sets auto recovery of data and index file recreation.
|
||||
// IMPORTANT: This flag MUST BE used only if a proper backup was made of all
|
||||
// the existing datafiles.
|
||||
|
||||
Reference in New Issue
Block a user