Compare commits

...

25 Commits

Author SHA1 Message Date
James Mills
803b08949e Fix setup target in Makefile to install mockery correctly 2019-10-17 13:59:00 +10:00
James Mills
65e9317d26 Fix glfmt/golint issues 2019-10-14 16:55:47 +10:00
James Mills
c4e12e0019 Fix spelling mistake in README s/Sponser/Sponsor 2019-10-08 22:04:22 +10:00
Steve Mynott
029f901bb7 fix example (#106) 2019-09-28 08:08:45 +10:00
James Mills
af8bf54962 Add *.db to ignore future accidental commits of a bitcask db to the repo 2019-09-26 21:09:27 +10:00
James Mills
5ea05fb3c2 Add unit test for opening bad database with corrupted/invalid datafiles (#105) 2019-09-26 21:07:13 +10:00
James Mills
5fe19989d4 Update Drone CI test pipeline 2019-09-25 22:15:22 +10:00
Ignacio Hagopian
498ea4069c codebeat: Code quality improvement (#103)
* codebeat: improve & bugfix

* codebeat: refactor to improve readability

* bugfix

* bugfix

* internal/data/codec: improve code coverage
2019-09-24 07:19:07 +10:00
James Mills
42c2b810bf Update README.md 2019-09-22 21:26:52 +10:00
Ignacio Hagopian
16a7feb603 cmd/bitcask: add recovery tool for datafiles (#102)
* cmd/bitcask: refactor recovery index logic

* cmd/bitcask: first version of datafile recovery tool

* cmd/bitcask: finished recovery datafile tool

* cmd/bitcask: temporary script to test recovery tool

* cmd/bitcask: remove commited binary file

* cmd/bitcask: delete bash test
2019-09-21 18:33:36 -03:00
Ignacio Hagopian
f17187a5c7 Test for data corruption in datafile decoding (#99)
* internal/data: move codec to own subpackage

* internal/data/codec: check & test nil Entry Decode

* internal/data/decoder: test for short prefix error

* internal/data/codec: test invalid key & value sizes

* internal/data/codec: check & test for truncated data

* interna/data/codec: use assert for tests
2019-09-16 09:29:08 -03:00
Ignacio Hagopian
5be114adab Makefile setup & key/value coherent datatypes & refactoring (#98)
* internal/data: comment exported functions

* internal/data: make smaller codec exported api surface

* make key and value sizes serializing bubble up to everything

* Makefile setup & go mod tidy
2019-09-12 10:44:26 -03:00
Ignacio Hagopian
7e0fa151f7 fix test compilation (#97) 2019-09-10 06:25:09 +10:00
James Mills
d59d5ad8c2 Improves Test Coverage by covering error cases (#95)
* Add Unit  Test for testing a corrupted config

* Add Unit Test for testing errors from .Stats()

* Refactor  Datafile into an interface and add Unit Tests for testing Merge() errors

* Refactor indexer into an interface and add Unit Tests for .Close() errors

* Add Unit Tests for .Delete() errors

* Add Unit Tests for  testing Put/Get errors

* Add Unit Test for testing Open errors (bad path for example)

* Refactor out bitcask.writeConfig

* Add more tests for config errors

* Add unit test for options that might error

* Add more test cases for close errors

* Add test case for rotating datafiles

* Fix a possible data race in .Stats()

* Add test case for checksum errors

* Add test case for Sync errors with Put and WithSync enabled

* Refactor and use testify.mock for mocks and generate mocks for all interfaces

* Refactor TestCloseErrors

* Refactored TestDeleteErrors

* Refactored TestGetErrors

* Refactored TestPutErrors

* Refactored TestMergeErrors and fixed a bug with .Fold()

* Add test case for Scan() errors

* Apparently only Scan() can return nil Node()s?
2019-09-09 07:18:38 +10:00
Ignacio Hagopian
13e35b7acc bitcask: fix data races & use Encode() to serialize config (#94) 2019-09-07 09:09:08 +10:00
Ignacio Hagopian
0d3a9213ed cmd/bitcask: recovery tool (#92)
* cmd/bitcask: recovery tool

* refactor configuration & use it in recover tool
2019-09-07 07:57:30 +10:00
James Mills
f4fb4972ee Improves test coverage by adding some missing unit tests (#90)
* Add Unit Test for testing WithSync() option

* Add Unit Test for testing re-indexing

* Add Unit Test for testing re-indexing with deleted keys (tombstone values)
2019-09-04 22:45:04 +10:00
James Mills
1108840967 Refactor the bitcaskd (redis compatible server) sample to improve code quality (#88) 2019-09-04 22:44:33 +10:00
James Mills
003c3abc42 Update to Go 1.13 and update README with new benchmarks (#89) 2019-09-04 22:43:53 +10:00
Ignacio Hagopian
a2b5ae2287 fix: check of persisted index values (#91) 2019-09-04 22:42:32 +10:00
James Mills
1c7df7f9c7 Removed unused readConfig() (#87) 2019-09-04 21:25:31 +10:00
Ignacio Hagopian
93cc1d409f codec_index: check sizes, new tests for data corruption & refactor (#84)
* bitcask/codec_index: check key and data sizes

* codec_index: tests for key and data size overflows

* codec_index: simplify internal funcs for unused returns
2019-09-04 12:26:26 +10:00
James Mills
24ab3fbf27 Update README.md 2019-09-04 08:20:44 +10:00
Ignacio Hagopian
8041a4c1e7 Refactor and general tests for codec index (#83)
* codec_index: unexport const fields

* codec_index: unexport internal functions and doc exported ones

* codec_index: rename func & return errors for corruption

* codec_index: new test for ReadIndex, WriteIndex, and read corruption

* Update internal/codec_index.go

Co-Authored-By: James Mills <1290234+prologic@users.noreply.github.com>

* Update internal/codec_index.go

Co-Authored-By: James Mills <1290234+prologic@users.noreply.github.com>
2019-09-03 08:19:35 +10:00
James Mills
50d3971e86 Fixed a bug with incorrect offsets populating the trie (#82) 2019-09-02 19:44:11 +10:00
31 changed files with 2177 additions and 742 deletions

View File

@@ -5,7 +5,7 @@ steps:
- name: build
image: golang:latest
commands:
- go test -v -short -cover -coverprofile=coverage.txt -coverpkg=$(go list) .
- go test -v -cover -coverprofile=coverage.txt -covermode=atomic -coverpkg=$(shell go list) -race .
- name: coverage
image: plugins/codecov

1
.gitignore vendored
View File

@@ -1,5 +1,6 @@
*~*
*.bak
*.db
/coverage.txt
/bitcask

View File

@@ -1,4 +1,4 @@
.PHONY: dev build generate install image release profile bench test clean
.PHONY: dev build generate install image release profile bench test clean setup
CGO_ENABLED=0
VERSION=$(shell git describe --abbrev=0 --tags)
@@ -39,6 +39,9 @@ profile: build
bench: build
@go test -v -benchmem -bench=. .
mocks:
@mockery -all -case underscore -output ./internal/mocks -recursive
test: build
@go test -v \
-cover -coverprofile=coverage.txt -covermode=atomic \
@@ -46,5 +49,8 @@ test: build
-race \
.
setup:
@go get github.com/vektra/mockery/...
clean:
@git clean -f -d -X

View File

@@ -3,6 +3,7 @@
[![Build Status](https://cloud.drone.io/api/badges/prologic/bitcask/status.svg)](https://cloud.drone.io/prologic/bitcask)
[![CodeCov](https://codecov.io/gh/prologic/bitcask/branch/master/graph/badge.svg)](https://codecov.io/gh/prologic/bitcask)
[![Go Report Card](https://goreportcard.com/badge/prologic/bitcask)](https://goreportcard.com/report/prologic/bitcask)
[![codebeat badge](https://codebeat.co/badges/15fba8a5-3044-4f40-936f-9e0f5d5d1fd9)](https://codebeat.co/projects/github-com-prologic-bitcask-master)
[![GoDoc](https://godoc.org/github.com/prologic/bitcask?status.svg)](https://godoc.org/github.com/prologic/bitcask)
[![GitHub license](https://img.shields.io/github/license/prologic/bitcask.svg)](https://github.com/prologic/bitcask)
[![Sourcegraph](https://sourcegraph.com/github.com/prologic/bitcask/-/badge.svg)](https://sourcegraph.com/github.com/prologic/bitcask?badge)
@@ -49,8 +50,8 @@ import "github.com/prologic/bitcask"
func main() {
db, _ := bitcask.Open("/tmp/db")
defer db.Close()
db.Put("Hello", []byte("World"))
val, _ := db.Get("Hello")
db.Put([]byte("Hello"), []byte("World"))
val, _ := db.Get([]byte("Hello"))
}
```
@@ -117,45 +118,45 @@ goos: darwin
goarch: amd64
pkg: github.com/prologic/bitcask
BenchmarkGet/128B-4 300000 3913 ns/op 32.71 MB/s 387 B/op 4 allocs/op
BenchmarkGet/128BWithPool-4 300000 4143 ns/op 30.89 MB/s 227 B/op 3 allocs/op
BenchmarkGet/256B-4 300000 3919 ns/op 65.31 MB/s 643 B/op 4 allocs/op
BenchmarkGet/256BWithPool-4 300000 4270 ns/op 59.95 MB/s 355 B/op 3 allocs/op
BenchmarkGet/512B-4 300000 4248 ns/op 120.52 MB/s 1187 B/op 4 allocs/op
BenchmarkGet/512BWithPool-4 300000 4676 ns/op 109.48 MB/s 611 B/op 3 allocs/op
BenchmarkGet/1K-4 200000 5248 ns/op 195.10 MB/s 2275 B/op 4 allocs/op
BenchmarkGet/1KWithPool-4 200000 5270 ns/op 194.28 MB/s 1123 B/op 3 allocs/op
BenchmarkGet/2K-4 200000 6229 ns/op 328.74 MB/s 4451 B/op 4 allocs/op
BenchmarkGet/2KWithPool-4 200000 6282 ns/op 325.99 MB/s 2147 B/op 3 allocs/op
BenchmarkGet/4K-4 200000 9027 ns/op 453.74 MB/s 9059 B/op 4 allocs/op
BenchmarkGet/4KWithPool-4 200000 8906 ns/op 459.87 MB/s 4195 B/op 3 allocs/op
BenchmarkGet/8K-4 100000 12024 ns/op 681.28 MB/s 17763 B/op 4 allocs/op
BenchmarkGet/8KWithPool-4 200000 11103 ns/op 737.79 MB/s 8291 B/op 3 allocs/op
BenchmarkGet/16K-4 100000 16844 ns/op 972.65 MB/s 34915 B/op 4 allocs/op
BenchmarkGet/16KWithPool-4 100000 14575 ns/op 1124.10 MB/s 16483 B/op 3 allocs/op
BenchmarkGet/32K-4 50000 27770 ns/op 1179.97 MB/s 73827 B/op 4 allocs/op
BenchmarkGet/32KWithPool-4 100000 24495 ns/op 1337.74 MB/s 32867 B/op 3 allocs/op
BenchmarkGet/128B-4 316515 3263 ns/op 39.22 MB/s 160 B/op 1 allocs/op
BenchmarkGet/256B-4 382551 3204 ns/op 79.90 MB/s 288 B/op 1 allocs/op
BenchmarkGet/512B-4 357216 3835 ns/op 133.51 MB/s 576 B/op 1 allocs/op
BenchmarkGet/1K-4 274958 4429 ns/op 231.20 MB/s 1152 B/op 1 allocs/op
BenchmarkGet/2K-4 227764 5013 ns/op 408.55 MB/s 2304 B/op 1 allocs/op
BenchmarkGet/4K-4 187557 5534 ns/op 740.15 MB/s 4864 B/op 1 allocs/op
BenchmarkGet/8K-4 153546 7652 ns/op 1070.56 MB/s 9472 B/op 1 allocs/op
BenchmarkGet/16K-4 115549 10272 ns/op 1594.95 MB/s 18432 B/op 1 allocs/op
BenchmarkGet/32K-4 69592 16405 ns/op 1997.39 MB/s 40960 B/op 1 allocs/op
BenchmarkPut/128B-4 100000 17492 ns/op 7.32 MB/s 441 B/op 6 allocs/op
BenchmarkPut/256B-4 100000 17234 ns/op 14.85 MB/s 571 B/op 6 allocs/op
BenchmarkPut/512B-4 100000 22837 ns/op 22.42 MB/s 861 B/op 6 allocs/op
BenchmarkPut/1K-4 50000 30333 ns/op 33.76 MB/s 1443 B/op 6 allocs/op
BenchmarkPut/2K-4 30000 45304 ns/op 45.21 MB/s 2606 B/op 6 allocs/op
BenchmarkPut/4K-4 20000 83953 ns/op 48.79 MB/s 5187 B/op 6 allocs/op
BenchmarkPut/8K-4 10000 142142 ns/op 57.63 MB/s 9845 B/op 6 allocs/op
BenchmarkPut/16K-4 5000 206722 ns/op 79.26 MB/s 18884 B/op 6 allocs/op
BenchmarkPut/32K-4 5000 361108 ns/op 90.74 MB/s 41582 B/op 7 allocs/op
BenchmarkPut/128BNoSync-4 123519 11094 ns/op 11.54 MB/s 49 B/op 2 allocs/op
BenchmarkPut/256BNoSync-4 84662 13398 ns/op 19.11 MB/s 50 B/op 2 allocs/op
BenchmarkPut/1KNoSync-4 46345 24855 ns/op 41.20 MB/s 58 B/op 2 allocs/op
BenchmarkPut/2KNoSync-4 28820 43817 ns/op 46.74 MB/s 68 B/op 2 allocs/op
BenchmarkPut/4KNoSync-4 13976 90059 ns/op 45.48 MB/s 89 B/op 2 allocs/op
BenchmarkPut/8KNoSync-4 7852 155101 ns/op 52.82 MB/s 130 B/op 2 allocs/op
BenchmarkPut/16KNoSync-4 4848 238113 ns/op 68.81 MB/s 226 B/op 2 allocs/op
BenchmarkPut/32KNoSync-4 2564 391483 ns/op 83.70 MB/s 377 B/op 3 allocs/op
BenchmarkScan-4 1000000 1679 ns/op 408 B/op 16 allocs/op
BenchmarkPut/128BSync-4 260 4611273 ns/op 0.03 MB/s 48 B/op 2 allocs/op
BenchmarkPut/256BSync-4 265 4665506 ns/op 0.05 MB/s 48 B/op 2 allocs/op
BenchmarkPut/1KSync-4 256 4757334 ns/op 0.22 MB/s 48 B/op 2 allocs/op
BenchmarkPut/2KSync-4 255 4996788 ns/op 0.41 MB/s 92 B/op 2 allocs/op
BenchmarkPut/4KSync-4 222 5136481 ns/op 0.80 MB/s 98 B/op 2 allocs/op
BenchmarkPut/8KSync-4 223 5530824 ns/op 1.48 MB/s 99 B/op 2 allocs/op
BenchmarkPut/16KSync-4 213 5717880 ns/op 2.87 MB/s 202 B/op 2 allocs/op
BenchmarkPut/32KSync-4 211 5835948 ns/op 5.61 MB/s 355 B/op 3 allocs/op
BenchmarkScan-4 568696 2036 ns/op 392 B/op 33 allocs/op
PASS
```
For 128B values:
* ~200,000 reads/sec
* ~50,000 writes/sec
* ~300,000 reads/sec
* ~90,000 writes/sec
* ~490,000 scans/sec
The full benchmark above shows linear performance as you increase key/value sizes. Memory pooling starts to become advantageous for larger values.
The full benchmark above shows linear performance as you increase key/value sizes.
## Stargazers over time
@@ -167,7 +168,7 @@ Support the ongoing development of Bitcask!
**Sponser**
- Become a [Sponser](https://www.patreon.com/prologic)
- Become a [Sponsor](https://www.patreon.com/prologic)
## Contributors

View File

@@ -1,7 +1,6 @@
package bitcask
import (
"encoding/json"
"errors"
"hash/crc32"
"io"
@@ -14,6 +13,9 @@ import (
"github.com/gofrs/flock"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
"github.com/prologic/bitcask/internal/config"
"github.com/prologic/bitcask/internal/data"
"github.com/prologic/bitcask/internal/index"
)
var (
@@ -45,12 +47,13 @@ type Bitcask struct {
*flock.Flock
config *config
config *config.Config
options []Option
path string
curr *internal.Datafile
datafiles map[int]*internal.Datafile
curr data.Datafile
datafiles map[int]data.Datafile
trie art.Tree
indexer index.Indexer
}
// Stats is a struct returned by Stats() on an open Bitcask instance
@@ -63,18 +66,14 @@ type Stats struct {
// Stats returns statistics about the database including the number of
// data files, keys and overall size on disk of the data
func (b *Bitcask) Stats() (stats Stats, err error) {
var size int64
size, err = internal.DirSize(b.path)
if err != nil {
if stats.Size, err = internal.DirSize(b.path); err != nil {
return
}
stats.Datafiles = len(b.datafiles)
b.mu.RLock()
stats.Datafiles = len(b.datafiles)
stats.Keys = b.trie.Size()
b.mu.RUnlock()
stats.Size = size
return
}
@@ -88,16 +87,7 @@ func (b *Bitcask) Close() error {
os.Remove(b.Flock.Path())
}()
f, err := os.OpenFile(filepath.Join(b.path, "index"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
defer f.Close()
if err := internal.WriteIndex(b.trie, f); err != nil {
return err
}
if err := f.Sync(); err != nil {
if err := b.indexer.Save(b.trie, filepath.Join(b.path, "index")); err != nil {
return err
}
@@ -118,12 +108,12 @@ func (b *Bitcask) Sync() error {
// Get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
func (b *Bitcask) Get(key []byte) ([]byte, error) {
var df *internal.Datafile
var df data.Datafile
b.mu.RLock()
value, found := b.trie.Search(key)
b.mu.RUnlock()
if !found {
b.mu.RUnlock()
return nil, ErrKeyNotFound
}
@@ -136,6 +126,7 @@ func (b *Bitcask) Get(key []byte) ([]byte, error) {
}
e, err := df.ReadAt(item.Offset, item.Size)
b.mu.RUnlock()
if err != nil {
return nil, err
}
@@ -158,26 +149,28 @@ func (b *Bitcask) Has(key []byte) bool {
// Put stores the key and value in the database.
func (b *Bitcask) Put(key, value []byte) error {
if len(key) > b.config.maxKeySize {
if uint32(len(key)) > b.config.MaxKeySize {
return ErrKeyTooLarge
}
if len(value) > b.config.maxValueSize {
if uint64(len(value)) > b.config.MaxValueSize {
return ErrValueTooLarge
}
b.mu.Lock()
offset, n, err := b.put(key, value)
if err != nil {
b.mu.Unlock()
return err
}
if b.config.sync {
if b.config.Sync {
if err := b.curr.Sync(); err != nil {
b.mu.Unlock()
return err
}
}
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
b.mu.Lock()
b.trie.Insert(key, item)
b.mu.Unlock()
@@ -187,12 +180,12 @@ func (b *Bitcask) Put(key, value []byte) error {
// Delete deletes the named key. If the key doesn't exist or an I/O error
// occurs the error is returned.
func (b *Bitcask) Delete(key []byte) error {
b.mu.Lock()
_, _, err := b.put(key, []byte{})
if err != nil {
b.mu.Unlock()
return err
}
b.mu.Lock()
b.trie.Delete(key)
b.mu.Unlock()
@@ -233,12 +226,6 @@ func (b *Bitcask) Keys() chan []byte {
for it := b.trie.Iterator(); it.HasNext(); {
node, _ := it.Next()
// Skip the root node
if len(node.Key()) == 0 {
continue
}
ch <- node.Key()
}
close(ch)
@@ -250,26 +237,23 @@ func (b *Bitcask) Keys() chan []byte {
// Fold iterates over all keys in the database calling the function `f` for
// each key. If the function returns an error, no further keys are processed
// and the error returned.
func (b *Bitcask) Fold(f func(key []byte) error) error {
func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEach(func(node art.Node) bool {
if err := f(node.Key()); err != nil {
if err = f(node.Key()); err != nil {
return false
}
return true
})
return nil
return
}
func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
b.mu.Lock()
defer b.mu.Unlock()
size := b.curr.Size()
if size >= int64(b.config.maxDatafileSize) {
if size >= int64(b.config.MaxDatafileSize) {
err := b.curr.Close()
if err != nil {
return -1, 0, err
@@ -277,7 +261,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
id := b.curr.FileID()
df, err := internal.NewDatafile(b.path, id, true)
df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return -1, 0, err
}
@@ -285,7 +269,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
b.datafiles[id] = df
id = b.curr.FileID() + 1
curr, err := internal.NewDatafile(b.path, id, false)
curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return -1, 0, err
}
@@ -296,93 +280,21 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
return b.curr.Write(e)
}
func (b *Bitcask) readConfig() error {
if internal.Exists(filepath.Join(b.path, "config.json")) {
data, err := ioutil.ReadFile(filepath.Join(b.path, "config.json"))
if err != nil {
return err
}
if err := json.Unmarshal(data, &b.config); err != nil {
return err
}
}
return nil
}
func (b *Bitcask) writeConfig() error {
data, err := json.Marshal(b.config)
if err != nil {
return err
}
return ioutil.WriteFile(filepath.Join(b.path, "config.json"), data, 0600)
}
func (b *Bitcask) reopen() error {
b.mu.Lock()
defer b.mu.Unlock()
fns, err := internal.GetDatafiles(b.path)
datafiles, lastID, err := loadDatafiles(b.path, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return err
}
ids, err := internal.ParseIds(fns)
t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles)
if err != nil {
return err
}
datafiles := make(map[int]*internal.Datafile, len(ids))
for _, id := range ids {
df, err := internal.NewDatafile(b.path, id, true)
if err != nil {
return err
}
datafiles[id] = df
}
t := art.New()
if internal.Exists(path.Join(b.path, "index")) {
f, err := os.Open(path.Join(b.path, "index"))
if err != nil {
return err
}
defer f.Close()
if err := internal.ReadIndex(f, t); err != nil {
return err
}
} else {
for i, df := range datafiles {
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
t.Delete(e.Key)
continue
}
item := internal.Item{FileID: ids[i], Offset: e.Offset, Size: n}
t.Insert(e.Key, item)
}
}
}
var id int
if len(ids) > 0 {
id = ids[(len(ids) - 1)]
}
curr, err := internal.NewDatafile(b.path, id, false)
curr, err := data.NewDatafile(b.path, lastID, false, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return err
}
@@ -479,7 +391,7 @@ func (b *Bitcask) Merge() error {
// configuration options as functions.
func Open(path string, options ...Option) (*Bitcask, error) {
var (
cfg *config
cfg *config.Config
err error
)
@@ -487,8 +399,13 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return nil, err
}
cfg, err = getConfig(path)
if err != nil {
configPath := filepath.Join(path, "config.json")
if internal.Exists(configPath) {
cfg, err = config.Load(configPath)
if err != nil {
return nil, err
}
} else {
cfg = newDefaultConfig()
}
@@ -497,6 +414,7 @@ func Open(path string, options ...Option) (*Bitcask, error) {
config: cfg,
options: options,
path: path,
indexer: index.NewIndexer(),
}
for _, opt := range options {
@@ -514,7 +432,7 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return nil, ErrDatabaseLocked
}
if err := bitcask.writeConfig(); err != nil {
if err := cfg.Save(configPath); err != nil {
return nil, err
}
@@ -524,3 +442,60 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return bitcask, nil
}
func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64) (datafiles map[int]data.Datafile, lastID int, err error) {
fns, err := internal.GetDatafiles(path)
if err != nil {
return nil, 0, err
}
ids, err := internal.ParseIds(fns)
if err != nil {
return nil, 0, err
}
datafiles = make(map[int]data.Datafile, len(ids))
for _, id := range ids {
datafiles[id], err = data.NewDatafile(path, id, true, maxKeySize, maxValueSize)
if err != nil {
return
}
}
if len(ids) > 0 {
lastID = ids[len(ids)-1]
}
return
}
func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile) (art.Tree, error) {
t, found, err := indexer.Load(filepath.Join(path, "index"), maxKeySize)
if err != nil {
return nil, err
}
if !found {
for _, df := range datafiles {
var offset int64
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
t.Delete(e.Key)
offset += n
continue
}
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
t.Insert(e.Key, item)
offset += n
}
}
}
return t, nil
}

View File

@@ -2,9 +2,11 @@ package bitcask
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sort"
"strings"
@@ -12,6 +14,14 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/prologic/bitcask/internal"
"github.com/prologic/bitcask/internal/config"
"github.com/prologic/bitcask/internal/mocks"
)
var (
ErrMockError = errors.New("error: mock error")
)
type sortByteArrays [][]byte
@@ -194,6 +204,200 @@ func TestDeletedKeys(t *testing.T) {
})
}
func TestConfigErrors(t *testing.T) {
assert := assert.New(t)
t.Run("CorruptConfig", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir)
assert.NoError(err)
assert.NoError(db.Close())
assert.NoError(ioutil.WriteFile(filepath.Join(testdir, "config.json"), []byte("foo bar baz"), 0600))
_, err = Open(testdir)
assert.Error(err)
})
t.Run("BadConfigPath", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
assert.NoError(os.Mkdir(filepath.Join(testdir, "config.json"), 0700))
_, err = Open(testdir)
assert.Error(err)
})
}
func TestReIndex(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
t.Run("Setup", func(t *testing.T) {
var (
db *Bitcask
err error
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
err = db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
val, err := db.Get([]byte("foo"))
assert.NoError(err)
assert.Equal([]byte("bar"), val)
})
t.Run("Sync", func(t *testing.T) {
err = db.Sync()
assert.NoError(err)
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
t.Run("DeleteIndex", func(t *testing.T) {
err := os.Remove(filepath.Join(testdir, "index"))
assert.NoError(err)
})
})
t.Run("Reopen", func(t *testing.T) {
var (
db *Bitcask
err error
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
val, err := db.Get([]byte("foo"))
assert.NoError(err)
assert.Equal([]byte("bar"), val)
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
}
func TestReIndexDeletedKeys(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
t.Run("Setup", func(t *testing.T) {
var (
db *Bitcask
err error
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
err = db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
val, err := db.Get([]byte("foo"))
assert.NoError(err)
assert.Equal([]byte("bar"), val)
})
t.Run("Delete", func(t *testing.T) {
err := db.Delete([]byte("foo"))
assert.NoError(err)
_, err = db.Get([]byte("foo"))
assert.Error(err)
assert.Equal(ErrKeyNotFound, err)
})
t.Run("Sync", func(t *testing.T) {
err = db.Sync()
assert.NoError(err)
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
t.Run("DeleteIndex", func(t *testing.T) {
err := os.Remove(filepath.Join(testdir, "index"))
assert.NoError(err)
})
})
t.Run("Reopen", func(t *testing.T) {
var (
db *Bitcask
err error
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
_, err := db.Get([]byte("foo"))
assert.Error(err)
assert.Equal(ErrKeyNotFound, err)
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
}
func TestSync(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithSync(true))
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
key := []byte(strings.Repeat(" ", 17))
value := []byte("foobar")
err = db.Put(key, value)
})
}
func TestMaxKeySize(t *testing.T) {
assert := assert.New(t)
@@ -285,6 +489,109 @@ func TestStats(t *testing.T) {
})
}
func TestStatsError(t *testing.T) {
var (
db *Bitcask
err error
)
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
err := db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
val, err := db.Get([]byte("foo"))
assert.NoError(err)
assert.Equal([]byte("bar"), val)
})
t.Run("Stats", func(t *testing.T) {
stats, err := db.Stats()
assert.NoError(err)
assert.Equal(stats.Datafiles, 0)
assert.Equal(stats.Keys, 1)
})
t.Run("FabricatedDestruction", func(t *testing.T) {
// This would never happen in reality :D
// Or would it? :)
err = os.RemoveAll(testdir)
assert.NoError(err)
})
t.Run("Stats", func(t *testing.T) {
_, err := db.Stats()
assert.Error(err)
})
})
}
func TestMaxDatafileSize(t *testing.T) {
var (
db *Bitcask
err error
)
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxDatafileSize(32))
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
err := db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
})
})
t.Run("Put", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := db.Put([]byte(fmt.Sprintf("key_%d", i)), []byte("bar"))
assert.NoError(err)
}
})
t.Run("Sync", func(t *testing.T) {
err = db.Sync()
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
val, err := db.Get([]byte("foo"))
assert.NoError(err)
assert.Equal([]byte("bar"), val)
for i := 0; i < 10; i++ {
val, err = db.Get([]byte(fmt.Sprintf("key_%d", i)))
assert.NoError(err)
assert.Equal([]byte("bar"), val)
}
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
}
func TestMerge(t *testing.T) {
var (
db *Bitcask
@@ -349,6 +656,309 @@ func TestMerge(t *testing.T) {
})
}
func TestGetErrors(t *testing.T) {
assert := assert.New(t)
t.Run("ReadError", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir, WithMaxDatafileSize(32))
assert.NoError(err)
err = db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
mockDatafile := new(mocks.Datafile)
mockDatafile.On("FileID").Return(0)
mockDatafile.On("ReadAt", int64(0), int64(22)).Return(
internal.Entry{},
ErrMockError,
)
db.curr = mockDatafile
_, err = db.Get([]byte("foo"))
assert.Error(err)
assert.Equal(ErrMockError, err)
})
t.Run("ChecksumError", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir, WithMaxDatafileSize(32))
assert.NoError(err)
err = db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
mockDatafile := new(mocks.Datafile)
mockDatafile.On("FileID").Return(0)
mockDatafile.On("ReadAt", int64(0), int64(22)).Return(
internal.Entry{
Checksum: 0x0,
Key: []byte("foo"),
Offset: 0,
Value: []byte("bar"),
},
nil,
)
db.curr = mockDatafile
_, err = db.Get([]byte("foo"))
assert.Error(err)
assert.Equal(ErrChecksumFailed, err)
})
}
func TestPutErrors(t *testing.T) {
assert := assert.New(t)
t.Run("WriteError", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
db, err := Open(testdir)
assert.NoError(err)
mockDatafile := new(mocks.Datafile)
mockDatafile.On("Size").Return(int64(0))
mockDatafile.On(
"Write",
internal.Entry{
Checksum: 0x76ff8caa,
Key: []byte("foo"),
Offset: 0,
Value: []byte("bar"),
},
).Return(int64(0), int64(0), ErrMockError)
db.curr = mockDatafile
err = db.Put([]byte("foo"), []byte("bar"))
assert.Error(err)
assert.Equal(ErrMockError, err)
})
t.Run("SyncError", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
db, err := Open(testdir, WithSync(true))
assert.NoError(err)
mockDatafile := new(mocks.Datafile)
mockDatafile.On("Size").Return(int64(0))
mockDatafile.On(
"Write",
internal.Entry{
Checksum: 0x78240498,
Key: []byte("bar"),
Offset: 0,
Value: []byte("baz"),
},
).Return(int64(0), int64(0), nil)
mockDatafile.On("Sync").Return(ErrMockError)
db.curr = mockDatafile
err = db.Put([]byte("bar"), []byte("baz"))
assert.Error(err)
assert.Equal(ErrMockError, err)
})
}
func TestOpenErrors(t *testing.T) {
assert := assert.New(t)
t.Run("BadPath", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
assert.NoError(ioutil.WriteFile(filepath.Join(testdir, "foo"), []byte("foo"), 0600))
_, err = Open(filepath.Join(testdir, "foo", "tmp.db"))
assert.Error(err)
})
t.Run("BadOption", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
withBogusOption := func() Option {
return func(cfg *config.Config) error {
return errors.New("mocked error")
}
}
_, err = Open(testdir, withBogusOption())
assert.Error(err)
})
t.Run("LoadDatafilesError", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir)
assert.NoError(err)
err = db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
err = db.Close()
assert.NoError(err)
// Simulate some horrible that happened to the datafiles!
err = os.Rename(filepath.Join(testdir, "000000000.data"), filepath.Join(testdir, "000000000xxx.data"))
assert.NoError(err)
_, err = Open(testdir)
assert.Error(err)
assert.Equal("strconv.ParseInt: parsing \"000000000xxx\": invalid syntax", err.Error())
})
}
func TestCloseErrors(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
t.Run("CloseIndexError", func(t *testing.T) {
db, err := Open(testdir, WithMaxDatafileSize(32))
assert.NoError(err)
mockIndexer := new(mocks.Indexer)
mockIndexer.On("Save", db.trie, filepath.Join(db.path, "index")).Return(ErrMockError)
db.indexer = mockIndexer
err = db.Close()
assert.Error(err)
assert.Equal(ErrMockError, err)
})
t.Run("CloseDatafilesError", func(t *testing.T) {
db, err := Open(testdir, WithMaxDatafileSize(32))
assert.NoError(err)
mockDatafile := new(mocks.Datafile)
mockDatafile.On("Close").Return(ErrMockError)
db.datafiles[0] = mockDatafile
err = db.Close()
assert.Error(err)
assert.Equal(ErrMockError, err)
})
t.Run("CloseActiveDatafileError", func(t *testing.T) {
db, err := Open(testdir, WithMaxDatafileSize(32))
assert.NoError(err)
mockDatafile := new(mocks.Datafile)
mockDatafile.On("Close").Return(ErrMockError)
db.curr = mockDatafile
err = db.Close()
assert.Error(err)
assert.Equal(ErrMockError, err)
})
}
func TestDeleteErrors(t *testing.T) {
assert := assert.New(t)
t.Run("WriteError", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir, WithMaxDatafileSize(32))
assert.NoError(err)
err = db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
mockDatafile := new(mocks.Datafile)
mockDatafile.On("Size").Return(int64(0))
mockDatafile.On(
"Write",
internal.Entry{
Checksum: 0x0,
Key: []byte("foo"),
Offset: 0,
Value: []byte{},
},
).Return(int64(0), int64(0), ErrMockError)
db.curr = mockDatafile
err = db.Delete([]byte("foo"))
assert.Error(err)
})
}
func TestMergeErrors(t *testing.T) {
assert := assert.New(t)
t.Run("RemoveDatabaseDirectory", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir, WithMaxDatafileSize(32))
assert.NoError(err)
assert.NoError(os.RemoveAll(testdir))
err = db.Merge()
assert.Error(err)
})
t.Run("EmptyCloseError", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir)
assert.NoError(err)
mockDatafile := new(mocks.Datafile)
mockDatafile.On("Close").Return(ErrMockError)
db.curr = mockDatafile
err = db.Merge()
assert.Error(err)
assert.Equal(ErrMockError, err)
})
t.Run("ReadError", func(t *testing.T) {
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(testdir)
db, err := Open(testdir)
assert.NoError(err)
assert.NoError(db.Put([]byte("foo"), []byte("bar")))
mockDatafile := new(mocks.Datafile)
mockDatafile.On("FileID").Return(0)
mockDatafile.On("ReadAt", int64(0), int64(22)).Return(
internal.Entry{},
ErrMockError,
)
db.curr = mockDatafile
err = db.Merge()
assert.Error(err)
assert.Equal(ErrMockError, err)
})
}
func TestConcurrent(t *testing.T) {
var (
db *Bitcask
@@ -477,6 +1087,14 @@ func TestScan(t *testing.T) {
vals = SortByteArrays(vals)
assert.Equal(expected, vals)
})
t.Run("ScanErrors", func(t *testing.T) {
err = db.Scan([]byte("fo"), func(key []byte) error {
return ErrMockError
})
assert.Error(err)
assert.Equal(ErrMockError, err)
})
}
func TestLocking(t *testing.T) {
@@ -531,8 +1149,8 @@ func BenchmarkGet(b *testing.B) {
value := []byte(strings.Repeat(" ", tt.size))
options := []Option{
WithMaxKeySize(len(key)),
WithMaxValueSize(tt.size),
WithMaxKeySize(uint32(len(key))),
WithMaxValueSize(uint64(tt.size)),
}
db, err := Open(testdir, options...)
if err != nil {
@@ -578,10 +1196,10 @@ func BenchmarkPut(b *testing.B) {
}
variants := map[string][]Option{
"NoSync": []Option{
"NoSync": {
WithSync(false),
},
"Sync": []Option{
"Sync": {
WithSync(true),
},
}

View File

@@ -50,11 +50,11 @@ func init() {
"with-max-datafile-size", "", bitcask.DefaultMaxDatafileSize,
"Maximum size of each datafile",
)
exportCmd.PersistentFlags().IntP(
exportCmd.PersistentFlags().Uint32P(
"with-max-key-size", "", bitcask.DefaultMaxKeySize,
"Maximum size of each key",
)
exportCmd.PersistentFlags().IntP(
exportCmd.PersistentFlags().Uint64P(
"with-max-value-size", "", bitcask.DefaultMaxValueSize,
"Maximum size of each value",
)
@@ -66,11 +66,6 @@ type kvPair struct {
}
func export(path, output string) int {
var (
err error
w io.WriteCloser
)
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
@@ -78,19 +73,29 @@ func export(path, output string) int {
}
defer db.Close()
if output == "-" {
w = os.Stdout
} else {
w, err = os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_TRUNC, 0755)
if err != nil {
w := os.Stdout
if output != "-" {
if w, err = os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_TRUNC, 0755); err != nil {
log.WithError(err).
WithField("output", output).
Error("error opening output for writing")
return 1
}
defer w.Close()
}
err = db.Fold(func(key []byte) error {
if err = db.Fold(exportKey(db, w)); err != nil {
log.WithError(err).
WithField("path", path).
WithField("output", output).
Error("error exporting keys")
return 2
}
return 0
}
func exportKey(db *bitcask.Bitcask, w io.Writer) func(key []byte) error {
return func(key []byte) error {
value, err := db.Get(key)
if err != nil {
log.WithError(err).
@@ -129,14 +134,5 @@ func export(path, output string) int {
}
return nil
})
if err != nil {
log.WithError(err).
WithField("path", path).
WithField("output", output).
Error("error exporting keys")
return 2
}
return 0
}

View File

@@ -30,8 +30,8 @@ var initdbCmd = &cobra.Command{
path := viper.GetString("path")
maxDatafileSize := viper.GetInt("with-max-datafile-size")
maxKeySize := viper.GetInt("with-max-key-size")
maxValueSize := viper.GetInt("with-max-value-size")
maxKeySize := viper.GetUint32("with-max-key-size")
maxValueSize := viper.GetUint64("with-max-value-size")
db, err := bitcask.Open(
path,
@@ -56,11 +56,11 @@ func init() {
"with-max-datafile-size", "", bitcask.DefaultMaxDatafileSize,
"Maximum size of each datafile",
)
initdbCmd.PersistentFlags().IntP(
initdbCmd.PersistentFlags().Uint32P(
"with-max-key-size", "", bitcask.DefaultMaxKeySize,
"Maximum size of each key",
)
initdbCmd.PersistentFlags().IntP(
initdbCmd.PersistentFlags().Uint64P(
"with-max-value-size", "", bitcask.DefaultMaxValueSize,
"Maximum size of each value",
)

141
cmd/bitcask/recover.go Normal file
View File

@@ -0,0 +1,141 @@
package main
import (
"fmt"
"io"
"os"
"path/filepath"
"github.com/prologic/bitcask"
"github.com/prologic/bitcask/internal"
"github.com/prologic/bitcask/internal/config"
"github.com/prologic/bitcask/internal/data/codec"
"github.com/prologic/bitcask/internal/index"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var recoveryCmd = &cobra.Command{
Use: "recover",
Aliases: []string{"recovery"},
Short: "Analyzes and recovers the index file for corruption scenarios",
Long: `This analyze files to detect different forms of persistence corruption in
persisted files. It also allows to recover the files to the latest point of integrity.
Recovered files have the .recovered extension`,
Args: cobra.ExactArgs(0),
PreRun: func(cmd *cobra.Command, args []string) {
viper.BindPFlag("dry-run", cmd.Flags().Lookup("dry-run"))
},
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
dryRun := viper.GetBool("dry-run")
os.Exit(recover(path, dryRun))
},
}
func init() {
RootCmd.AddCommand(recoveryCmd)
recoveryCmd.Flags().BoolP("dry-run", "n", false, "Will only check files health without applying recovery if unhealthy")
}
func recover(path string, dryRun bool) int {
maxKeySize := bitcask.DefaultMaxKeySize
maxValueSize := bitcask.DefaultMaxValueSize
if cfg, err := config.Load(filepath.Join(path, "config.json")); err == nil {
maxKeySize = cfg.MaxKeySize
maxValueSize = cfg.MaxValueSize
}
if err := recoverIndex(filepath.Join(path, "index"), maxKeySize, dryRun); err != nil {
log.WithError(err).Info("recovering index file")
return 1
}
datafiles, err := internal.GetDatafiles(path)
if err != nil {
log.WithError(err).Info("coudn't list existing datafiles")
return 1
}
for _, file := range datafiles {
if err := recoverDatafile(file, maxKeySize, maxValueSize, dryRun); err != nil {
log.WithError(err).Info("recovering data file")
return 1
}
}
return 0
}
func recoverIndex(path string, maxKeySize uint32, dryRun bool) error {
t, found, err := index.NewIndexer().Load(path, maxKeySize)
if err != nil && !index.IsIndexCorruption(err) {
log.WithError(err).Info("opening the index file")
}
if !found {
log.Info("index file doesn't exist, will be recreated on next run.")
return nil
}
if err == nil {
log.Debug("index file is not corrupted")
return nil
}
log.Debugf("index file is corrupted: %v", err)
if dryRun {
log.Debug("dry-run mode, not writing to a file")
return nil
}
// Leverage that t has the partiatially read tree even on corrupted files
err = index.NewIndexer().Save(t, "index.recovered")
if err != nil {
return fmt.Errorf("writing the recovered index file: %w", err)
}
log.Debug("the index was recovered in the index.recovered new file")
return nil
}
func recoverDatafile(path string, maxKeySize uint32, maxValueSize uint64, dryRun bool) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("opening the datafile: %w", err)
}
defer f.Close()
_, file := filepath.Split(path)
fr, err := os.OpenFile(fmt.Sprintf("%s.recovered", file), os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return fmt.Errorf("creating the recovered datafile: %w", err)
}
defer fr.Close()
dec := codec.NewDecoder(f, maxKeySize, maxValueSize)
enc := codec.NewEncoder(fr)
e := internal.Entry{}
for {
_, err = dec.Decode(&e)
if err == io.EOF {
break
}
if codec.IsCorruptedData(err) {
log.Debugf("%s is corrupted, a best-effort recovery was done", file)
return nil
}
if err != nil {
return fmt.Errorf("unexpected error while reading datafile: %w", err)
}
if dryRun {
continue
}
if _, err := enc.Encode(e); err != nil {
return fmt.Errorf("writing to recovered datafile: %w", err)
}
}
if err := os.Remove(fr.Name()); err != nil {
return fmt.Errorf("can't remove temporal recovered datafile: %w", err)
}
log.Debugf("%s is not corrupted", file)
return nil
}

View File

@@ -3,26 +3,22 @@ package main
import (
"fmt"
"os"
"strings"
log "github.com/sirupsen/logrus"
flag "github.com/spf13/pflag"
"github.com/tidwall/redcon"
"github.com/prologic/bitcask"
"github.com/prologic/bitcask/internal"
)
var (
bind string
debug bool
version bool
maxDatafileSize int
bind string
debug bool
version bool
)
func init() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [options] <path>\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Usage: %s [options] <dbpath>\n", os.Args[0])
flag.PrintDefaults()
}
@@ -30,8 +26,6 @@ func init() {
flag.BoolVarP(&debug, "debug", "d", false, "enable debug logging")
flag.StringVarP(&bind, "bind", "b", ":6379", "interface and port to bind to")
flag.IntVar(&maxDatafileSize, "max-datafile-size", 1<<20, "maximum datafile size in bytes")
}
func main() {
@@ -55,86 +49,11 @@ func main() {
path := flag.Arg(0)
db, err := bitcask.Open(path, bitcask.WithMaxDatafileSize(maxDatafileSize))
server, err := newServer(bind, path)
if err != nil {
log.WithError(err).WithField("path", path).Error("error opening database")
os.Exit(1)
log.WithError(err).Error("error creating server")
os.Exit(2)
}
log.WithField("bind", bind).WithField("path", path).Infof("starting bitcaskd v%s", internal.FullVersion())
err = redcon.ListenAndServe(bind,
func(conn redcon.Conn, cmd redcon.Command) {
switch strings.ToLower(string(cmd.Args[0])) {
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
case "set":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
value := cmd.Args[2]
err = db.Put(key, value)
if err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err))
} else {
conn.WriteString("OK")
}
case "get":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
value, err := db.Get(key)
if err != nil {
conn.WriteNull()
} else {
conn.WriteBulk(value)
}
case "keys":
conn.WriteArray(db.Len())
for key := range db.Keys() {
conn.WriteBulk([]byte(key))
}
case "exists":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
if db.Has(key) {
conn.WriteInt(1)
} else {
conn.WriteInt(0)
}
case "del":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
err := db.Delete(key)
if err != nil {
conn.WriteInt(0)
} else {
conn.WriteInt(1)
}
default:
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
}
},
func(conn redcon.Conn) bool {
return true
},
func(conn redcon.Conn, err error) {
},
)
if err != nil {
log.WithError(err).Fatal("oops")
}
log.Fatal(server.Run())
}

122
cmd/bitcaskd/server.go Normal file
View File

@@ -0,0 +1,122 @@
package main
import (
"fmt"
"strings"
log "github.com/sirupsen/logrus"
"github.com/tidwall/redcon"
"github.com/prologic/bitcask"
)
type server struct {
bind string
db *bitcask.Bitcask
}
func newServer(bind, dbpath string) (*server, error) {
db, err := bitcask.Open(dbpath)
if err != nil {
log.WithError(err).WithField("dbpath", dbpath).Error("error opening database")
return nil, err
}
return &server{
bind: bind,
db: db,
}, nil
}
func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) {
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
value := cmd.Args[2]
if err := s.db.Put(key, value); err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err))
} else {
conn.WriteString("OK")
}
}
func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) {
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
value, err := s.db.Get(key)
if err != nil {
conn.WriteNull()
} else {
conn.WriteBulk(value)
}
}
func (s *server) handleKeys(cmd redcon.Command, conn redcon.Conn) {
conn.WriteArray(s.db.Len())
for key := range s.db.Keys() {
conn.WriteBulk([]byte(key))
}
}
func (s *server) handleExists(cmd redcon.Command, conn redcon.Conn) {
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
if s.db.Has(key) {
conn.WriteInt(1)
} else {
conn.WriteInt(0)
}
}
func (s *server) handleDel(cmd redcon.Command, conn redcon.Conn) {
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
if err := s.db.Delete(key); err != nil {
conn.WriteInt(0)
} else {
conn.WriteInt(1)
}
}
func (s *server) Run() (err error) {
err = redcon.ListenAndServe(s.bind,
func(conn redcon.Conn, cmd redcon.Command) {
switch strings.ToLower(string(cmd.Args[0])) {
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
case "set":
s.handleSet(cmd, conn)
case "get":
s.handleGet(cmd, conn)
case "keys":
s.handleKeys(cmd, conn)
case "exists":
s.handleExists(cmd, conn)
case "del":
s.handleDel(cmd, conn)
default:
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
}
},
func(conn redcon.Conn) bool {
return true
},
func(conn redcon.Conn, err error) {
},
)
return
}

2
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/prologic/bitcask
go 1.12
go 1.13
require (
github.com/gofrs/flock v0.7.1

2
go.sum
View File

@@ -116,6 +116,7 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM
github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
@@ -172,6 +173,7 @@ golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846 h1:0oJP+9s5Z3MT6dym56c4f7nVeujVpL1QyD2Vp/bTql0=
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=

View File

@@ -1,105 +0,0 @@
package internal
import (
"bufio"
"encoding/binary"
"io"
"github.com/pkg/errors"
)
const (
KeySize = 4
ValueSize = 8
checksumSize = 4
)
// NewEncoder creates a streaming Entry encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// Entry encodings on it.
type Encoder struct {
w *bufio.Writer
}
// Encode takes any Entry and streams it to the underlying writer.
// Messages are framed with a key-length and value-length prefix.
func (e *Encoder) Encode(msg Entry) (int64, error) {
var bufKeyValue = make([]byte, KeySize+ValueSize)
binary.BigEndian.PutUint32(bufKeyValue[:KeySize], uint32(len(msg.Key)))
binary.BigEndian.PutUint64(bufKeyValue[KeySize:KeySize+ValueSize], uint64(len(msg.Value)))
if _, err := e.w.Write(bufKeyValue); err != nil {
return 0, errors.Wrap(err, "failed writing key & value length prefix")
}
if _, err := e.w.Write(msg.Key); err != nil {
return 0, errors.Wrap(err, "failed writing key data")
}
if _, err := e.w.Write(msg.Value); err != nil {
return 0, errors.Wrap(err, "failed writing value data")
}
bufChecksumSize := bufKeyValue[:checksumSize]
binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum)
if _, err := e.w.Write(bufChecksumSize); err != nil {
return 0, errors.Wrap(err, "failed writing checksum data")
}
if err := e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(KeySize + ValueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil
}
// NewDecoder creates a streaming Entry decoder.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: r}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// Entry decodings on it.
type Decoder struct {
r io.Reader
}
func (d *Decoder) Decode(v *Entry) (int64, error) {
prefixBuf := make([]byte, KeySize+ValueSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return 0, err
}
actualKeySize, actualValueSize := GetKeyValueSizes(prefixBuf)
buf := make([]byte, actualKeySize+actualValueSize+checksumSize)
if _, err = io.ReadFull(d.r, buf); err != nil {
return 0, errors.Wrap(translateError(err), "failed reading saved data")
}
DecodeWithoutPrefix(buf, actualKeySize, v)
return int64(KeySize + ValueSize + actualKeySize + actualValueSize + checksumSize), nil
}
func GetKeyValueSizes(buf []byte) (uint64, uint64) {
actualKeySize := binary.BigEndian.Uint32(buf[:KeySize])
actualValueSize := binary.BigEndian.Uint64(buf[KeySize:])
return uint64(actualKeySize), actualValueSize
}
func DecodeWithoutPrefix(buf []byte, valueOffset uint64, v *Entry) {
v.Key = buf[:valueOffset]
v.Value = buf[valueOffset : len(buf)-checksumSize]
v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:])
}
func translateError(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
}
return err
}

View File

@@ -1,110 +0,0 @@
package internal
import (
"encoding/binary"
"io"
art "github.com/plar/go-adaptive-radix-tree"
)
const (
Int32Size = 4
Int64Size = 8
FileIDSize = Int32Size
OffsetSize = Int64Size
SizeSize = Int64Size
)
func ReadBytes(r io.Reader) ([]byte, error) {
s := make([]byte, Int32Size)
_, err := io.ReadFull(r, s)
if err != nil {
return nil, err
}
size := binary.BigEndian.Uint32(s)
b := make([]byte, size)
_, err = io.ReadFull(r, b)
if err != nil {
return nil, err
}
return b, nil
}
func WriteBytes(b []byte, w io.Writer) (int, error) {
s := make([]byte, Int32Size)
binary.BigEndian.PutUint32(s, uint32(len(b)))
n, err := w.Write(s)
if err != nil {
return n, err
}
m, err := w.Write(b)
if err != nil {
return (n + m), err
}
return (n + m), nil
}
func ReadItem(r io.Reader) (Item, error) {
buf := make([]byte, (FileIDSize + OffsetSize + SizeSize))
_, err := io.ReadFull(r, buf)
if err != nil {
return Item{}, err
}
return Item{
FileID: int(binary.BigEndian.Uint32(buf[:FileIDSize])),
Offset: int64(binary.BigEndian.Uint64(buf[FileIDSize:(FileIDSize + OffsetSize)])),
Size: int64(binary.BigEndian.Uint64(buf[(FileIDSize + OffsetSize):])),
}, nil
}
func WriteItem(item Item, w io.Writer) (int, error) {
buf := make([]byte, (FileIDSize + OffsetSize + SizeSize))
binary.BigEndian.PutUint32(buf[:FileIDSize], uint32(item.FileID))
binary.BigEndian.PutUint64(buf[FileIDSize:(FileIDSize+OffsetSize)], uint64(item.Offset))
binary.BigEndian.PutUint64(buf[(FileIDSize+OffsetSize):], uint64(item.Size))
n, err := w.Write(buf)
if err != nil {
return 0, err
}
return n, nil
}
func ReadIndex(r io.Reader, t art.Tree) error {
for {
key, err := ReadBytes(r)
if err != nil {
if err == io.EOF {
break
}
return err
}
item, err := ReadItem(r)
if err != nil {
return err
}
t.Insert(key, item)
}
return nil
}
func WriteIndex(t art.Tree, w io.Writer) (err error) {
t.ForEach(func(node art.Node) bool {
_, err = WriteBytes(node.Key(), w)
if err != nil {
return false
}
item := node.Value().(Item)
_, err := WriteItem(item, w)
if err != nil {
return false
}
return true
})
return
}

54
internal/config/config.go Normal file
View File

@@ -0,0 +1,54 @@
package config
import (
"encoding/json"
"io/ioutil"
"os"
)
// Config contains the bitcask configuration parameters
type Config struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize uint32 `json:"max_key_size"`
MaxValueSize uint64 `json:"max_value_size"`
Sync bool `json:"sync"`
}
// Load loads a configuration from the given path
func Load(path string) (*Config, error) {
var cfg Config
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}
// Save saves the configuration to the provided path
func (c *Config) Save(path string) error {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return err
}
data, err := json.Marshal(c)
if err != nil {
return err
}
if _, err = f.Write(data); err != nil {
return err
}
if err = f.Sync(); err != nil {
return err
}
return f.Close()
}

View File

@@ -0,0 +1,99 @@
package codec
import (
"encoding/binary"
"io"
"github.com/pkg/errors"
"github.com/prologic/bitcask/internal"
)
var (
errInvalidKeyOrValueSize = errors.New("key/value size is invalid")
errCantDecodeOnNilEntry = errors.New("can't decode on nil entry")
errTruncatedData = errors.New("data is truncated")
)
// NewDecoder creates a streaming Entry decoder.
func NewDecoder(r io.Reader, maxKeySize uint32, maxValueSize uint64) *Decoder {
return &Decoder{
r: r,
maxKeySize: maxKeySize,
maxValueSize: maxValueSize,
}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// Entry decodings on it.
type Decoder struct {
r io.Reader
maxKeySize uint32
maxValueSize uint64
}
// Decode decodes the next Entry from the current stream
func (d *Decoder) Decode(v *internal.Entry) (int64, error) {
if v == nil {
return 0, errCantDecodeOnNilEntry
}
prefixBuf := make([]byte, keySize+valueSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return 0, err
}
actualKeySize, actualValueSize, err := getKeyValueSizes(prefixBuf, d.maxKeySize, d.maxValueSize)
if err != nil {
return 0, err
}
buf := make([]byte, uint64(actualKeySize)+actualValueSize+checksumSize)
if _, err = io.ReadFull(d.r, buf); err != nil {
return 0, errTruncatedData
}
decodeWithoutPrefix(buf, actualKeySize, v)
return int64(keySize + valueSize + uint64(actualKeySize) + actualValueSize + checksumSize), nil
}
// DecodeEntry decodes a serialized entry
func DecodeEntry(b []byte, e *internal.Entry, maxKeySize uint32, maxValueSize uint64) error {
valueOffset, _, err := getKeyValueSizes(b, maxKeySize, maxValueSize)
if err != nil {
return errors.Wrap(err, "key/value sizes are invalid")
}
decodeWithoutPrefix(b[keySize+valueSize:], valueOffset, e)
return nil
}
func getKeyValueSizes(buf []byte, maxKeySize uint32, maxValueSize uint64) (uint32, uint64, error) {
actualKeySize := binary.BigEndian.Uint32(buf[:keySize])
actualValueSize := binary.BigEndian.Uint64(buf[keySize:])
if actualKeySize > maxKeySize || actualValueSize > maxValueSize || actualKeySize == 0 {
return 0, 0, errInvalidKeyOrValueSize
}
return actualKeySize, actualValueSize, nil
}
func decodeWithoutPrefix(buf []byte, valueOffset uint32, v *internal.Entry) {
v.Key = buf[:valueOffset]
v.Value = buf[valueOffset : len(buf)-checksumSize]
v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:])
}
// IsCorruptedData indicates if the error correspondes to possible data corruption
func IsCorruptedData(err error) bool {
switch err {
case errCantDecodeOnNilEntry, errInvalidKeyOrValueSize, errTruncatedData:
return true
default:
return false
}
}

View File

@@ -0,0 +1,109 @@
package codec
import (
"bytes"
"encoding/binary"
"io"
"testing"
"github.com/prologic/bitcask/internal"
"github.com/stretchr/testify/assert"
)
func TestDecodeOnNilEntry(t *testing.T) {
t.Parallel()
assert := assert.New(t)
decoder := NewDecoder(&bytes.Buffer{}, 1, 1)
_, err := decoder.Decode(nil)
if assert.Error(err) {
assert.Equal(errCantDecodeOnNilEntry, err)
}
}
func TestShortPrefix(t *testing.T) {
t.Parallel()
assert := assert.New(t)
maxKeySize, maxValueSize := uint32(10), uint64(20)
prefix := make([]byte, keySize+valueSize)
binary.BigEndian.PutUint32(prefix, 1)
binary.BigEndian.PutUint64(prefix[keySize:], 1)
truncBytesCount := 2
buf := bytes.NewBuffer(prefix[:keySize+valueSize-truncBytesCount])
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
_, err := decoder.Decode(&internal.Entry{})
if assert.Error(err) {
assert.Equal(io.ErrUnexpectedEOF, err)
}
}
func TestInvalidValueKeySizes(t *testing.T) {
assert := assert.New(t)
maxKeySize, maxValueSize := uint32(10), uint64(20)
tests := []struct {
keySize uint32
valueSize uint64
name string
}{
{keySize: 0, valueSize: 5, name: "zero key size"}, //zero value size is correct for tombstones
{keySize: 11, valueSize: 5, name: "key size overflow"},
{keySize: 5, valueSize: 21, name: "value size overflow"},
{keySize: 11, valueSize: 21, name: "key and value size overflow"},
}
for i := range tests {
i := i
t.Run(tests[i].name, func(t *testing.T) {
t.Parallel()
prefix := make([]byte, keySize+valueSize)
binary.BigEndian.PutUint32(prefix, tests[i].keySize)
binary.BigEndian.PutUint64(prefix[keySize:], tests[i].valueSize)
buf := bytes.NewBuffer(prefix)
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
_, err := decoder.Decode(&internal.Entry{})
if assert.Error(err) {
assert.Equal(errInvalidKeyOrValueSize, err)
}
})
}
}
func TestTruncatedData(t *testing.T) {
assert := assert.New(t)
maxKeySize, maxValueSize := uint32(10), uint64(20)
key := []byte("foo")
value := []byte("bar")
data := make([]byte, keySize+valueSize+len(key)+len(value)+checksumSize)
binary.BigEndian.PutUint32(data, uint32(len(key)))
binary.BigEndian.PutUint64(data[keySize:], uint64(len(value)))
copy(data[keySize+valueSize:], key)
copy(data[keySize+valueSize+len(key):], value)
copy(data[keySize+valueSize+len(key)+len(value):], bytes.Repeat([]byte("0"), checksumSize))
tests := []struct {
data []byte
name string
}{
{data: data[:keySize+valueSize+len(key)-1], name: "truncated key"},
{data: data[:keySize+valueSize+len(key)+len(value)-1], name: "truncated value"},
{data: data[:keySize+valueSize+len(key)+len(value)+checksumSize-1], name: "truncated checksum"},
}
for i := range tests {
i := i
t.Run(tests[i].name, func(t *testing.T) {
t.Parallel()
buf := bytes.NewBuffer(tests[i].data)
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
_, err := decoder.Decode(&internal.Entry{})
if assert.Error(err) {
assert.Equal(errTruncatedData, err)
}
})
}
}

View File

@@ -0,0 +1,57 @@
package codec
import (
"bufio"
"encoding/binary"
"io"
"github.com/pkg/errors"
"github.com/prologic/bitcask/internal"
)
const (
keySize = 4
valueSize = 8
checksumSize = 4
)
// NewEncoder creates a streaming Entry encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// Entry encodings on it.
type Encoder struct {
w *bufio.Writer
}
// Encode takes any Entry and streams it to the underlying writer.
// Messages are framed with a key-length and value-length prefix.
func (e *Encoder) Encode(msg internal.Entry) (int64, error) {
var bufKeyValue = make([]byte, keySize+valueSize)
binary.BigEndian.PutUint32(bufKeyValue[:keySize], uint32(len(msg.Key)))
binary.BigEndian.PutUint64(bufKeyValue[keySize:keySize+valueSize], uint64(len(msg.Value)))
if _, err := e.w.Write(bufKeyValue); err != nil {
return 0, errors.Wrap(err, "failed writing key & value length prefix")
}
if _, err := e.w.Write(msg.Key); err != nil {
return 0, errors.Wrap(err, "failed writing key data")
}
if _, err := e.w.Write(msg.Value); err != nil {
return 0, errors.Wrap(err, "failed writing value data")
}
bufChecksumSize := bufKeyValue[:checksumSize]
binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum)
if _, err := e.w.Write(bufChecksumSize); err != nil {
return 0, errors.Wrap(err, "failed writing checksum data")
}
if err := e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(keySize + valueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil
}

View File

@@ -0,0 +1,29 @@
package codec
import (
"bytes"
"encoding/hex"
"testing"
"github.com/prologic/bitcask/internal"
"github.com/stretchr/testify/assert"
)
func TestEncode(t *testing.T) {
t.Parallel()
assert := assert.New(t)
var buf bytes.Buffer
encoder := NewEncoder(&buf)
_, err := encoder.Encode(internal.Entry{
Key: []byte("mykey"),
Value: []byte("myvalue"),
Checksum: 414141,
Offset: 424242,
})
expectedHex := "0000000500000000000000076d796b65796d7976616c7565000651bd"
if assert.NoError(err) {
assert.Equal(expectedHex, hex.EncodeToString(buf.Bytes()))
}
}

195
internal/data/datafile.go Normal file
View File

@@ -0,0 +1,195 @@
package data
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/pkg/errors"
"github.com/prologic/bitcask/internal"
"github.com/prologic/bitcask/internal/data/codec"
"golang.org/x/exp/mmap"
)
const (
defaultDatafileFilename = "%09d.data"
)
var (
errReadonly = errors.New("error: read only datafile")
errReadError = errors.New("error: read error")
mxMemPool sync.RWMutex
)
// Datafile is an interface that represents a readable and writeable datafile
type Datafile interface {
FileID() int
Name() string
Close() error
Sync() error
Size() int64
Read() (internal.Entry, int64, error)
ReadAt(index, size int64) (internal.Entry, error)
Write(internal.Entry) (int64, int64, error)
}
type datafile struct {
sync.RWMutex
id int
r *os.File
ra *mmap.ReaderAt
w *os.File
offset int64
dec *codec.Decoder
enc *codec.Encoder
maxKeySize uint32
maxValueSize uint64
}
// NewDatafile opens an existing datafile
func NewDatafile(path string, id int, readonly bool, maxKeySize uint32, maxValueSize uint64) (Datafile, error) {
var (
r *os.File
ra *mmap.ReaderAt
w *os.File
err error
)
fn := filepath.Join(path, fmt.Sprintf(defaultDatafileFilename, id))
if !readonly {
w, err = os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
if err != nil {
return nil, err
}
}
r, err = os.Open(fn)
if err != nil {
return nil, err
}
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
ra, err = mmap.Open(fn)
if err != nil {
return nil, err
}
offset := stat.Size()
dec := codec.NewDecoder(r, maxKeySize, maxValueSize)
enc := codec.NewEncoder(w)
return &datafile{
id: id,
r: r,
ra: ra,
w: w,
offset: offset,
dec: dec,
enc: enc,
maxKeySize: maxKeySize,
maxValueSize: maxValueSize,
}, nil
}
func (df *datafile) FileID() int {
return df.id
}
func (df *datafile) Name() string {
return df.r.Name()
}
func (df *datafile) Close() error {
defer func() {
df.ra.Close()
df.r.Close()
}()
// Readonly datafile -- Nothing further to close on the write side
if df.w == nil {
return nil
}
err := df.Sync()
if err != nil {
return err
}
return df.w.Close()
}
func (df *datafile) Sync() error {
if df.w == nil {
return nil
}
return df.w.Sync()
}
func (df *datafile) Size() int64 {
df.RLock()
defer df.RUnlock()
return df.offset
}
// Read reads the next entry from the datafile
func (df *datafile) Read() (e internal.Entry, n int64, err error) {
df.Lock()
defer df.Unlock()
n, err = df.dec.Decode(&e)
if err != nil {
return
}
return
}
// ReadAt the entry located at index offset with expected serialized size
func (df *datafile) ReadAt(index, size int64) (e internal.Entry, err error) {
var n int
b := make([]byte, size)
if df.w == nil {
n, err = df.ra.ReadAt(b, index)
} else {
n, err = df.r.ReadAt(b, index)
}
if err != nil {
return
}
if int64(n) != size {
err = errReadError
return
}
codec.DecodeEntry(b, &e, df.maxKeySize, df.maxValueSize)
return
}
func (df *datafile) Write(e internal.Entry) (int64, int64, error) {
if df.w == nil {
return -1, 0, errReadonly
}
df.Lock()
defer df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(e)
if err != nil {
return -1, 0, err
}
df.offset += n
return e.Offset, n, nil
}

View File

@@ -1,175 +0,0 @@
package internal
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/pkg/errors"
"golang.org/x/exp/mmap"
)
const (
DefaultDatafileFilename = "%09d.data"
)
var (
ErrReadonly = errors.New("error: read only datafile")
ErrReadError = errors.New("error: read error")
mxMemPool sync.RWMutex
)
type Datafile struct {
sync.RWMutex
id int
r *os.File
ra *mmap.ReaderAt
w *os.File
offset int64
dec *Decoder
enc *Encoder
}
func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
var (
r *os.File
ra *mmap.ReaderAt
w *os.File
err error
)
fn := filepath.Join(path, fmt.Sprintf(DefaultDatafileFilename, id))
if !readonly {
w, err = os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
if err != nil {
return nil, err
}
}
r, err = os.Open(fn)
if err != nil {
return nil, err
}
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
ra, err = mmap.Open(fn)
if err != nil {
return nil, err
}
offset := stat.Size()
dec := NewDecoder(r)
enc := NewEncoder(w)
return &Datafile{
id: id,
r: r,
ra: ra,
w: w,
offset: offset,
dec: dec,
enc: enc,
}, nil
}
func (df *Datafile) FileID() int {
return df.id
}
func (df *Datafile) Name() string {
return df.r.Name()
}
func (df *Datafile) Close() error {
defer func() {
df.ra.Close()
df.r.Close()
}()
// Readonly Datafile -- Nothing further to close on the write side
if df.w == nil {
return nil
}
err := df.Sync()
if err != nil {
return err
}
return df.w.Close()
}
func (df *Datafile) Sync() error {
if df.w == nil {
return nil
}
return df.w.Sync()
}
func (df *Datafile) Size() int64 {
df.RLock()
defer df.RUnlock()
return df.offset
}
func (df *Datafile) Read() (e Entry, n int64, err error) {
df.Lock()
defer df.Unlock()
n, err = df.dec.Decode(&e)
if err != nil {
return
}
return
}
func (df *Datafile) ReadAt(index, size int64) (e Entry, err error) {
var n int
b := make([]byte, size)
if df.w == nil {
n, err = df.ra.ReadAt(b, index)
} else {
n, err = df.r.ReadAt(b, index)
}
if err != nil {
return
}
if int64(n) != size {
err = ErrReadError
return
}
valueOffset, _ := GetKeyValueSizes(b)
DecodeWithoutPrefix(b[KeySize+ValueSize:], valueOffset, &e)
return
}
func (df *Datafile) Write(e Entry) (int64, int64, error) {
if df.w == nil {
return -1, 0, ErrReadonly
}
df.Lock()
defer df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(e)
if err != nil {
return -1, 0, err
}
df.offset += n
return e.Offset, n, nil
}

View File

@@ -12,6 +12,7 @@ type Entry struct {
Value []byte
}
// NewEntry creates a new `Entry` with the given `key` and `value`
func NewEntry(key, value []byte) Entry {
checksum := crc32.ChecksumIEEE(value)

View File

@@ -0,0 +1,138 @@
package index
import (
"encoding/binary"
"io"
"github.com/pkg/errors"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
var (
errTruncatedKeySize = errors.New("key size is truncated")
errTruncatedKeyData = errors.New("key data is truncated")
errTruncatedData = errors.New("data is truncated")
errKeySizeTooLarge = errors.New("key size too large")
)
const (
int32Size = 4
int64Size = 8
fileIDSize = int32Size
offsetSize = int64Size
sizeSize = int64Size
)
func readKeyBytes(r io.Reader, maxKeySize uint32) ([]byte, error) {
s := make([]byte, int32Size)
_, err := io.ReadFull(r, s)
if err != nil {
if err == io.EOF {
return nil, err
}
return nil, errors.Wrap(errTruncatedKeySize, err.Error())
}
size := binary.BigEndian.Uint32(s)
if size > uint32(maxKeySize) {
return nil, errKeySizeTooLarge
}
b := make([]byte, size)
_, err = io.ReadFull(r, b)
if err != nil {
return nil, errors.Wrap(errTruncatedKeyData, err.Error())
}
return b, nil
}
func writeBytes(b []byte, w io.Writer) error {
s := make([]byte, int32Size)
binary.BigEndian.PutUint32(s, uint32(len(b)))
_, err := w.Write(s)
if err != nil {
return err
}
_, err = w.Write(b)
if err != nil {
return err
}
return nil
}
func readItem(r io.Reader) (internal.Item, error) {
buf := make([]byte, (fileIDSize + offsetSize + sizeSize))
_, err := io.ReadFull(r, buf)
if err != nil {
return internal.Item{}, errors.Wrap(errTruncatedData, err.Error())
}
return internal.Item{
FileID: int(binary.BigEndian.Uint32(buf[:fileIDSize])),
Offset: int64(binary.BigEndian.Uint64(buf[fileIDSize:(fileIDSize + offsetSize)])),
Size: int64(binary.BigEndian.Uint64(buf[(fileIDSize + offsetSize):])),
}, nil
}
func writeItem(item internal.Item, w io.Writer) error {
buf := make([]byte, (fileIDSize + offsetSize + sizeSize))
binary.BigEndian.PutUint32(buf[:fileIDSize], uint32(item.FileID))
binary.BigEndian.PutUint64(buf[fileIDSize:(fileIDSize+offsetSize)], uint64(item.Offset))
binary.BigEndian.PutUint64(buf[(fileIDSize+offsetSize):], uint64(item.Size))
_, err := w.Write(buf)
if err != nil {
return err
}
return nil
}
// ReadIndex reads a persisted from a io.Reader into a Tree
func readIndex(r io.Reader, t art.Tree, maxKeySize uint32) error {
for {
key, err := readKeyBytes(r, maxKeySize)
if err != nil {
if err == io.EOF {
break
}
return err
}
item, err := readItem(r)
if err != nil {
return err
}
t.Insert(key, item)
}
return nil
}
func writeIndex(t art.Tree, w io.Writer) (err error) {
t.ForEach(func(node art.Node) bool {
err = writeBytes(node.Key(), w)
if err != nil {
return false
}
item := node.Value().(internal.Item)
err := writeItem(item, w)
if err != nil {
return false
}
return true
})
return
}
// IsIndexCorruption returns a boolean indicating whether the error
// is known to report a corruption data issue
func IsIndexCorruption(err error) bool {
cause := errors.Cause(err)
switch cause {
case errKeySizeTooLarge, errTruncatedData, errTruncatedKeyData, errTruncatedKeySize:
return true
}
return false
}

View File

@@ -0,0 +1,126 @@
package index
import (
"bytes"
"encoding/base64"
"encoding/binary"
"testing"
"github.com/pkg/errors"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
const (
base64SampleTree = "AAAABGFiY2QAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAARhYmNlAAAAAQAAAAAAAAABAAAAAAAAAAEAAAAEYWJjZgAAAAIAAAAAAAAAAgAAAAAAAAACAAAABGFiZ2QAAAADAAAAAAAAAAMAAAAAAAAAAw=="
)
func TestWriteIndex(t *testing.T) {
at, expectedSerializedSize := getSampleTree()
var b bytes.Buffer
err := writeIndex(at, &b)
if err != nil {
t.Fatalf("writing index failed: %v", err)
}
if b.Len() != expectedSerializedSize {
t.Fatalf("incorrect size of serialied index: expected %d, got: %d", expectedSerializedSize, b.Len())
}
sampleTreeBytes, _ := base64.StdEncoding.DecodeString(base64SampleTree)
if !bytes.Equal(b.Bytes(), sampleTreeBytes) {
t.Fatalf("unexpected serialization of the tree")
}
}
func TestReadIndex(t *testing.T) {
sampleTreeBytes, _ := base64.StdEncoding.DecodeString(base64SampleTree)
b := bytes.NewBuffer(sampleTreeBytes)
at := art.New()
err := readIndex(b, at, 1024)
if err != nil {
t.Fatalf("error while deserializing correct sample tree: %v", err)
}
atsample, _ := getSampleTree()
if atsample.Size() != at.Size() {
t.Fatalf("trees aren't the same size, expected %v, got %v", atsample.Size(), at.Size())
}
atsample.ForEach(func(node art.Node) bool {
_, found := at.Search(node.Key())
if !found {
t.Fatalf("expected node wasn't found: %s", node.Key())
}
return true
})
}
func TestReadCorruptedData(t *testing.T) {
sampleBytes, _ := base64.StdEncoding.DecodeString(base64SampleTree)
t.Run("truncated", func(t *testing.T) {
table := []struct {
name string
err error
data []byte
}{
{name: "key-size-first-item", err: errTruncatedKeySize, data: sampleBytes[:2]},
{name: "key-data-second-item", err: errTruncatedKeyData, data: sampleBytes[:6]},
{name: "key-size-second-item", err: errTruncatedKeySize, data: sampleBytes[:(int32Size+4+fileIDSize+offsetSize+sizeSize)+2]},
{name: "key-data-second-item", err: errTruncatedKeyData, data: sampleBytes[:(int32Size+4+fileIDSize+offsetSize+sizeSize)+6]},
{name: "data", err: errTruncatedData, data: sampleBytes[:int32Size+4+(fileIDSize+offsetSize+sizeSize-3)]},
}
for i := range table {
t.Run(table[i].name, func(t *testing.T) {
bf := bytes.NewBuffer(table[i].data)
if err := readIndex(bf, art.New(), 1024); !IsIndexCorruption(err) || errors.Cause(err) != table[i].err {
t.Fatalf("expected %v, got %v", table[i].err, err)
}
})
}
})
t.Run("overflow", func(t *testing.T) {
overflowKeySize := make([]byte, len(sampleBytes))
copy(overflowKeySize, sampleBytes)
binary.BigEndian.PutUint32(overflowKeySize, 1025)
overflowDataSize := make([]byte, len(sampleBytes))
copy(overflowDataSize, sampleBytes)
binary.BigEndian.PutUint32(overflowDataSize[int32Size+4+fileIDSize+offsetSize:], 1025)
table := []struct {
name string
err error
maxKeySize uint32
data []byte
}{
{name: "key-data-overflow", err: errKeySizeTooLarge, maxKeySize: 1024, data: overflowKeySize},
}
for i := range table {
t.Run(table[i].name, func(t *testing.T) {
bf := bytes.NewBuffer(table[i].data)
if err := readIndex(bf, art.New(), table[i].maxKeySize); !IsIndexCorruption(err) || errors.Cause(err) != table[i].err {
t.Fatalf("expected %v, got %v", table[i].err, err)
}
})
}
})
}
func getSampleTree() (art.Tree, int) {
at := art.New()
keys := [][]byte{[]byte("abcd"), []byte("abce"), []byte("abcf"), []byte("abgd")}
expectedSerializedSize := 0
for i := range keys {
at.Insert(keys[i], internal.Item{FileID: i, Offset: int64(i), Size: int64(i)})
expectedSerializedSize += int32Size + len(keys[i]) + fileIDSize + offsetSize + sizeSize
}
return at, expectedSerializedSize
}

59
internal/index/index.go Normal file
View File

@@ -0,0 +1,59 @@
package index
import (
"os"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
// Indexer is an interface for loading and saving the index (an Adaptive Radix Tree)
type Indexer interface {
Load(path string, maxkeySize uint32) (art.Tree, bool, error)
Save(t art.Tree, path string) error
}
// NewIndexer returns an instance of the default `Indexer` implemtnation
// which perists the index (an Adaptive Radix Tree) as a binary blob on file
func NewIndexer() Indexer {
return &indexer{}
}
type indexer struct{}
func (i *indexer) Load(path string, maxKeySize uint32) (art.Tree, bool, error) {
t := art.New()
if !internal.Exists(path) {
return t, false, nil
}
f, err := os.Open(path)
if err != nil {
return t, true, err
}
defer f.Close()
if err := readIndex(f, t, maxKeySize); err != nil {
return t, true, err
}
return t, true, nil
}
func (i *indexer) Save(t art.Tree, path string) error {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
defer f.Close()
if err := writeIndex(t, f); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
return f.Close()
}

View File

@@ -1,5 +1,8 @@
package internal
// Item represents the location of the value on disk. This is used by the
// internal Adaptive Radix Tree to hold an in-memory structure mapping keys to
// locations on disk of where the value(s) can be read from.
type Item struct {
FileID int `json:"fileid"`
Offset int64 `json:"offset"`

158
internal/mocks/datafile.go Normal file
View File

@@ -0,0 +1,158 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import internal "github.com/prologic/bitcask/internal"
import mock "github.com/stretchr/testify/mock"
// Datafile is an autogenerated mock type for the Datafile type
type Datafile struct {
mock.Mock
}
// Close provides a mock function with given fields:
func (_m *Datafile) Close() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// FileID provides a mock function with given fields:
func (_m *Datafile) FileID() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// Name provides a mock function with given fields:
func (_m *Datafile) Name() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// Read provides a mock function with given fields:
func (_m *Datafile) Read() (internal.Entry, int64, error) {
ret := _m.Called()
var r0 internal.Entry
if rf, ok := ret.Get(0).(func() internal.Entry); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(internal.Entry)
}
var r1 int64
if rf, ok := ret.Get(1).(func() int64); ok {
r1 = rf()
} else {
r1 = ret.Get(1).(int64)
}
var r2 error
if rf, ok := ret.Get(2).(func() error); ok {
r2 = rf()
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// ReadAt provides a mock function with given fields: index, size
func (_m *Datafile) ReadAt(index int64, size int64) (internal.Entry, error) {
ret := _m.Called(index, size)
var r0 internal.Entry
if rf, ok := ret.Get(0).(func(int64, int64) internal.Entry); ok {
r0 = rf(index, size)
} else {
r0 = ret.Get(0).(internal.Entry)
}
var r1 error
if rf, ok := ret.Get(1).(func(int64, int64) error); ok {
r1 = rf(index, size)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Size provides a mock function with given fields:
func (_m *Datafile) Size() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// Sync provides a mock function with given fields:
func (_m *Datafile) Sync() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Write provides a mock function with given fields: _a0
func (_m *Datafile) Write(_a0 internal.Entry) (int64, int64, error) {
ret := _m.Called(_a0)
var r0 int64
if rf, ok := ret.Get(0).(func(internal.Entry) int64); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(int64)
}
var r1 int64
if rf, ok := ret.Get(1).(func(internal.Entry) int64); ok {
r1 = rf(_a0)
} else {
r1 = ret.Get(1).(int64)
}
var r2 error
if rf, ok := ret.Get(2).(func(internal.Entry) error); ok {
r2 = rf(_a0)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}

56
internal/mocks/indexer.go Normal file
View File

@@ -0,0 +1,56 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import art "github.com/plar/go-adaptive-radix-tree"
import mock "github.com/stretchr/testify/mock"
// Indexer is an autogenerated mock type for the Indexer type
type Indexer struct {
mock.Mock
}
// Load provides a mock function with given fields: path, maxkeySize
func (_m *Indexer) Load(path string, maxkeySize uint32) (art.Tree, bool, error) {
ret := _m.Called(path, maxkeySize)
var r0 art.Tree
if rf, ok := ret.Get(0).(func(string, uint32) art.Tree); ok {
r0 = rf(path, maxkeySize)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(art.Tree)
}
}
var r1 bool
if rf, ok := ret.Get(1).(func(string, uint32) bool); ok {
r1 = rf(path, maxkeySize)
} else {
r1 = ret.Get(1).(bool)
}
var r2 error
if rf, ok := ret.Get(2).(func(string, uint32) error); ok {
r2 = rf(path, maxkeySize)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// Save provides a mock function with given fields: t, path
func (_m *Indexer) Save(t art.Tree, path string) error {
ret := _m.Called(t, path)
var r0 error
if rf, ok := ret.Get(0).(func(art.Tree, string) error); ok {
r0 = rf(t, path)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@@ -9,11 +9,14 @@ import (
"strings"
)
// Exists returns `true` if the given `path` on the current file system exists
func Exists(path string) bool {
_, err := os.Stat(path)
return err == nil
}
// DirSize returns the space occupied by the given `path` on disk on the current
// file system.
func DirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
@@ -28,6 +31,9 @@ func DirSize(path string) (int64, error) {
return size, err
}
// GetDatafiles returns a list of all data files stored in the database path
// given by `path`. All datafiles are identified by the the glob `*.data` and
// the basename is represented by an monotomic increasing integer.
func GetDatafiles(path string) ([]string, error) {
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
if err != nil {
@@ -37,6 +43,8 @@ func GetDatafiles(path string) ([]string, error) {
return fns, nil
}
// ParseIds will parse a list of datafiles as returned by `GetDatafiles` and
// extract the id part and return a slice of ints.
func ParseIds(fns []string) ([]int, error) {
var ids []int
for _, fn := range fns {

View File

@@ -1,101 +1,44 @@
package bitcask
import (
"encoding/json"
"io/ioutil"
"path/filepath"
)
import "github.com/prologic/bitcask/internal/config"
const (
// DefaultMaxDatafileSize is the default maximum datafile size in bytes
DefaultMaxDatafileSize = 1 << 20 // 1MB
// DefaultMaxKeySize is the default maximum key size in bytes
DefaultMaxKeySize = 64 // 64 bytes
DefaultMaxKeySize = uint32(64) // 64 bytes
// DefaultMaxValueSize is the default value size in bytes
DefaultMaxValueSize = 1 << 16 // 65KB
DefaultMaxValueSize = uint64(1 << 16) // 65KB
// DefaultSync is the default file synchronization action
DefaultSync = false
)
// Option is a function that takes a config struct and modifies it
type Option func(*config) error
type config struct {
maxDatafileSize int
maxKeySize int
maxValueSize int
sync bool
}
func (c *config) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
Sync bool `json:"sync"`
}{
MaxDatafileSize: c.maxDatafileSize,
MaxKeySize: c.maxKeySize,
MaxValueSize: c.maxValueSize,
Sync: c.sync,
})
}
func getConfig(path string) (*config, error) {
type Config struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
Sync bool `json:"sync"`
}
var cfg Config
data, err := ioutil.ReadFile(filepath.Join(path, "config.json"))
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return &config{
maxDatafileSize: cfg.MaxDatafileSize,
maxKeySize: cfg.MaxKeySize,
maxValueSize: cfg.MaxValueSize,
sync: cfg.Sync,
}, nil
}
func newDefaultConfig() *config {
return &config{
maxDatafileSize: DefaultMaxDatafileSize,
maxKeySize: DefaultMaxKeySize,
maxValueSize: DefaultMaxValueSize,
}
}
type Option func(*config.Config) error
// WithMaxDatafileSize sets the maximum datafile size option
func WithMaxDatafileSize(size int) Option {
return func(cfg *config) error {
cfg.maxDatafileSize = size
return func(cfg *config.Config) error {
cfg.MaxDatafileSize = size
return nil
}
}
// WithMaxKeySize sets the maximum key size option
func WithMaxKeySize(size int) Option {
return func(cfg *config) error {
cfg.maxKeySize = size
func WithMaxKeySize(size uint32) Option {
return func(cfg *config.Config) error {
cfg.MaxKeySize = size
return nil
}
}
// WithMaxValueSize sets the maximum value size option
func WithMaxValueSize(size int) Option {
return func(cfg *config) error {
cfg.maxValueSize = size
func WithMaxValueSize(size uint64) Option {
return func(cfg *config.Config) error {
cfg.MaxValueSize = size
return nil
}
}
@@ -103,8 +46,17 @@ func WithMaxValueSize(size int) Option {
// WithSync causes Sync() to be called on every key/value written increasing
// durability and safety at the expense of performance
func WithSync(sync bool) Option {
return func(cfg *config) error {
cfg.sync = sync
return func(cfg *config.Config) error {
cfg.Sync = sync
return nil
}
}
func newDefaultConfig() *config.Config {
return &config.Config{
MaxDatafileSize: DefaultMaxDatafileSize,
MaxKeySize: DefaultMaxKeySize,
MaxValueSize: DefaultMaxValueSize,
Sync: DefaultSync,
}
}