Compare commits

..

46 Commits

Author SHA1 Message Date
James Mills
803b08949e Fix setup target in Makefile to install mockery correctly 2019-10-17 13:59:00 +10:00
James Mills
65e9317d26 Fix glfmt/golint issues 2019-10-14 16:55:47 +10:00
James Mills
c4e12e0019 Fix spelling mistake in README s/Sponser/Sponsor 2019-10-08 22:04:22 +10:00
Steve Mynott
029f901bb7 fix example (#106) 2019-09-28 08:08:45 +10:00
James Mills
af8bf54962 Add *.db to ignore future accidental commits of a bitcask db to the repo 2019-09-26 21:09:27 +10:00
James Mills
5ea05fb3c2 Add unit test for opening bad database with corrupted/invalid datafiles (#105) 2019-09-26 21:07:13 +10:00
James Mills
5fe19989d4 Update Drone CI test pipeline 2019-09-25 22:15:22 +10:00
Ignacio Hagopian
498ea4069c codebeat: Code quality improvement (#103)
* codebeat: improve & bugfix

* codebeat: refactor to improve readability

* bugfix

* bugfix

* internal/data/codec: improve code coverage
2019-09-24 07:19:07 +10:00
James Mills
42c2b810bf Update README.md 2019-09-22 21:26:52 +10:00
Ignacio Hagopian
16a7feb603 cmd/bitcask: add recovery tool for datafiles (#102)
* cmd/bitcask: refactor recovery index logic

* cmd/bitcask: first version of datafile recovery tool

* cmd/bitcask: finished recovery datafile tool

* cmd/bitcask: temporary script to test recovery tool

* cmd/bitcask: remove commited binary file

* cmd/bitcask: delete bash test
2019-09-21 18:33:36 -03:00
Ignacio Hagopian
f17187a5c7 Test for data corruption in datafile decoding (#99)
* internal/data: move codec to own subpackage

* internal/data/codec: check & test nil Entry Decode

* internal/data/decoder: test for short prefix error

* internal/data/codec: test invalid key & value sizes

* internal/data/codec: check & test for truncated data

* interna/data/codec: use assert for tests
2019-09-16 09:29:08 -03:00
Ignacio Hagopian
5be114adab Makefile setup & key/value coherent datatypes & refactoring (#98)
* internal/data: comment exported functions

* internal/data: make smaller codec exported api surface

* make key and value sizes serializing bubble up to everything

* Makefile setup & go mod tidy
2019-09-12 10:44:26 -03:00
Ignacio Hagopian
7e0fa151f7 fix test compilation (#97) 2019-09-10 06:25:09 +10:00
James Mills
d59d5ad8c2 Improves Test Coverage by covering error cases (#95)
* Add Unit  Test for testing a corrupted config

* Add Unit Test for testing errors from .Stats()

* Refactor  Datafile into an interface and add Unit Tests for testing Merge() errors

* Refactor indexer into an interface and add Unit Tests for .Close() errors

* Add Unit Tests for .Delete() errors

* Add Unit Tests for  testing Put/Get errors

* Add Unit Test for testing Open errors (bad path for example)

* Refactor out bitcask.writeConfig

* Add more tests for config errors

* Add unit test for options that might error

* Add more test cases for close errors

* Add test case for rotating datafiles

* Fix a possible data race in .Stats()

* Add test case for checksum errors

* Add test case for Sync errors with Put and WithSync enabled

* Refactor and use testify.mock for mocks and generate mocks for all interfaces

* Refactor TestCloseErrors

* Refactored TestDeleteErrors

* Refactored TestGetErrors

* Refactored TestPutErrors

* Refactored TestMergeErrors and fixed a bug with .Fold()

* Add test case for Scan() errors

* Apparently only Scan() can return nil Node()s?
2019-09-09 07:18:38 +10:00
Ignacio Hagopian
13e35b7acc bitcask: fix data races & use Encode() to serialize config (#94) 2019-09-07 09:09:08 +10:00
Ignacio Hagopian
0d3a9213ed cmd/bitcask: recovery tool (#92)
* cmd/bitcask: recovery tool

* refactor configuration & use it in recover tool
2019-09-07 07:57:30 +10:00
James Mills
f4fb4972ee Improves test coverage by adding some missing unit tests (#90)
* Add Unit Test for testing WithSync() option

* Add Unit Test for testing re-indexing

* Add Unit Test for testing re-indexing with deleted keys (tombstone values)
2019-09-04 22:45:04 +10:00
James Mills
1108840967 Refactor the bitcaskd (redis compatible server) sample to improve code quality (#88) 2019-09-04 22:44:33 +10:00
James Mills
003c3abc42 Update to Go 1.13 and update README with new benchmarks (#89) 2019-09-04 22:43:53 +10:00
Ignacio Hagopian
a2b5ae2287 fix: check of persisted index values (#91) 2019-09-04 22:42:32 +10:00
James Mills
1c7df7f9c7 Removed unused readConfig() (#87) 2019-09-04 21:25:31 +10:00
Ignacio Hagopian
93cc1d409f codec_index: check sizes, new tests for data corruption & refactor (#84)
* bitcask/codec_index: check key and data sizes

* codec_index: tests for key and data size overflows

* codec_index: simplify internal funcs for unused returns
2019-09-04 12:26:26 +10:00
James Mills
24ab3fbf27 Update README.md 2019-09-04 08:20:44 +10:00
Ignacio Hagopian
8041a4c1e7 Refactor and general tests for codec index (#83)
* codec_index: unexport const fields

* codec_index: unexport internal functions and doc exported ones

* codec_index: rename func & return errors for corruption

* codec_index: new test for ReadIndex, WriteIndex, and read corruption

* Update internal/codec_index.go

Co-Authored-By: James Mills <1290234+prologic@users.noreply.github.com>

* Update internal/codec_index.go

Co-Authored-By: James Mills <1290234+prologic@users.noreply.github.com>
2019-09-03 08:19:35 +10:00
James Mills
50d3971e86 Fixed a bug with incorrect offsets populating the trie (#82) 2019-09-02 19:44:11 +10:00
Ignacio Hagopian
0338755f8c fix readme typos (#81) 2019-09-02 10:22:08 +10:00
Ignacio Hagopian
877bf982b1 fix go vet (#80) 2019-09-02 10:20:56 +10:00
James Mills
abbbeb8e1d Replace keydir with ART trie (#75)
* Replace keydir with ART trie

* Address some review feedback

* Address review feedback (consts)
2019-09-02 08:38:56 +10:00
James Mills
36bc134b22 Fix a bug wit the decoder passing the wrong value for the value's offset into the buffer (#77) 2019-08-31 08:35:17 +10:00
James Mills
ea96b8afc0 Update README.md 2019-08-30 13:28:17 +10:00
James Mills
b3d6f734b6 Use an Adaptive Radix Tree (#71) 2019-08-30 08:13:24 +10:00
Ignacio Hagopian
55459a5c93 readme: fix stargazers link (#73) 2019-08-30 08:05:08 +10:00
James Mills
a20ee3e3d4 Update README.md 2019-08-26 08:45:15 +10:00
James Mills
cd27b84069 Create FUNDING.yml 2019-08-25 12:01:15 +10:00
James Mills
b28353de02 Update README.md 2019-08-18 22:34:14 +10:00
Awn
e8bee948bc Make optimised scan functionality optional (#68) 2019-08-16 10:51:59 +10:00
Xin Zhang
156d29e344 Fix typo (#65) 2019-08-14 20:53:08 +10:00
James Mills
c5a565cd82 Adds WithSync(...) option to turn on sync after write durability (#63)
* Added WithSync(...) option to turn  on sync after write durability

* Add Sync/NoSync benchmark variants for Put()
2019-08-12 06:47:46 +10:00
Ignacio Hagopian
8f56cffd86 codec: collapse write and save extra alloc (#64) 2019-08-12 06:47:26 +10:00
James Mills
7204a33512 Fix and cleanup some unnecessary internal sub-packages and duplication 2019-08-08 22:28:25 +10:00
James Mills
c7d101d34f Fixed missing model import for codec with undefined Entry 2019-08-08 20:14:27 +10:00
Awn
af43cfa8f1 Remove merge function (#60)
* tidy: clean up some leftovers

Fixes #56
Fixes #57
Fixes #58

* api: remove standalone merge function

Fixes #55
2019-08-08 19:51:45 +10:00
James Mills
110c5024ee Update README.md 2019-08-08 12:17:42 +10:00
Ignacio Hagopian
1f10b4026d inline hash function to save extra alloc (#53) 2019-08-08 09:52:31 +10:00
Ignacio Hagopian
fd179b4a86 custom high-performance encoder implementation (#52) 2019-08-08 09:21:46 +10:00
James Mills
755b1879b5 Use []byte byte slices as keys directly avoiding serialing string(s) (#46) (#51) 2019-08-08 08:14:48 +10:00
43 changed files with 2414 additions and 1077 deletions

View File

@@ -5,7 +5,7 @@ steps:
- name: build
image: golang:latest
commands:
- go test -v -short -cover -coverprofile=coverage.txt -coverpkg=$(go list) .
- go test -v -cover -coverprofile=coverage.txt -covermode=atomic -coverpkg=$(shell go list) -race .
- name: coverage
image: plugins/codecov

2
.github/FUNDING.yml vendored Normal file
View File

@@ -0,0 +1,2 @@
github: prologic
patreon: prologic

1
.gitignore vendored
View File

@@ -1,5 +1,6 @@
*~*
*.bak
*.db
/coverage.txt
/bitcask

View File

@@ -1,4 +1,4 @@
.PHONY: dev build generate install image release profile bench test clean
.PHONY: dev build generate install image release profile bench test clean setup
CGO_ENABLED=0
VERSION=$(shell git describe --abbrev=0 --tags)
@@ -39,6 +39,9 @@ profile: build
bench: build
@go test -v -benchmem -bench=. .
mocks:
@mockery -all -case underscore -output ./internal/mocks -recursive
test: build
@go test -v \
-cover -coverprofile=coverage.txt -covermode=atomic \
@@ -46,5 +49,8 @@ test: build
-race \
.
setup:
@go get github.com/vektra/mockery/...
clean:
@git clean -f -d -X

View File

@@ -3,8 +3,10 @@
[![Build Status](https://cloud.drone.io/api/badges/prologic/bitcask/status.svg)](https://cloud.drone.io/prologic/bitcask)
[![CodeCov](https://codecov.io/gh/prologic/bitcask/branch/master/graph/badge.svg)](https://codecov.io/gh/prologic/bitcask)
[![Go Report Card](https://goreportcard.com/badge/prologic/bitcask)](https://goreportcard.com/report/prologic/bitcask)
[![codebeat badge](https://codebeat.co/badges/15fba8a5-3044-4f40-936f-9e0f5d5d1fd9)](https://codebeat.co/projects/github-com-prologic-bitcask-master)
[![GoDoc](https://godoc.org/github.com/prologic/bitcask?status.svg)](https://godoc.org/github.com/prologic/bitcask)
[![GitHub license](https://img.shields.io/github/license/prologic/bitcask.svg)](https://github.com/prologic/bitcask)
[![Sourcegraph](https://sourcegraph.com/github.com/prologic/bitcask/-/badge.svg)](https://sourcegraph.com/github.com/prologic/bitcask?badge)
A high performance Key/Value store written in [Go](https://golang.org) with a predictable read/write performance and high throughput. Uses a [Bitcask](https://en.wikipedia.org/wiki/Bitcask) on-disk layout (LSM+WAL) similar to [Riak](https://riak.com/)
@@ -21,27 +23,11 @@ For a more feature-complete Redis-compatible server, distributed key/value store
## Development
1. Get the source
```#!sh
$ git clone https://github.com/prologic/bitcask.git
```
2. Install required tools
This library uses [Protobuf](https://github.com/protocolbuffers/protobuf) to serialize data on disk. Please follow the
instructions for installing `protobuf` on your system. You will also need the
following Go libraries/tools to generate Go code from Protobuf defs:
- [protoc-gen-go](https://github.com/golang/protobuf)
3. Build the project
```#!sh
$ make
```
This will invoke `go generate` and `go build`.
## Install
```#!sh
@@ -64,8 +50,8 @@ import "github.com/prologic/bitcask"
func main() {
db, _ := bitcask.Open("/tmp/db")
defer db.Close()
db.Put("Hello", []byte("World"))
val, _ := db.Get("Hello")
db.Put([]byte("Hello"), []byte("World"))
val, _ := db.Get([]byte("Hello"))
}
```
@@ -132,49 +118,61 @@ goos: darwin
goarch: amd64
pkg: github.com/prologic/bitcask
BenchmarkGet/128B-4 300000 3913 ns/op 32.71 MB/s 387 B/op 4 allocs/op
BenchmarkGet/128BWithPool-4 300000 4143 ns/op 30.89 MB/s 227 B/op 3 allocs/op
BenchmarkGet/256B-4 300000 3919 ns/op 65.31 MB/s 643 B/op 4 allocs/op
BenchmarkGet/256BWithPool-4 300000 4270 ns/op 59.95 MB/s 355 B/op 3 allocs/op
BenchmarkGet/512B-4 300000 4248 ns/op 120.52 MB/s 1187 B/op 4 allocs/op
BenchmarkGet/512BWithPool-4 300000 4676 ns/op 109.48 MB/s 611 B/op 3 allocs/op
BenchmarkGet/1K-4 200000 5248 ns/op 195.10 MB/s 2275 B/op 4 allocs/op
BenchmarkGet/1KWithPool-4 200000 5270 ns/op 194.28 MB/s 1123 B/op 3 allocs/op
BenchmarkGet/2K-4 200000 6229 ns/op 328.74 MB/s 4451 B/op 4 allocs/op
BenchmarkGet/2KWithPool-4 200000 6282 ns/op 325.99 MB/s 2147 B/op 3 allocs/op
BenchmarkGet/4K-4 200000 9027 ns/op 453.74 MB/s 9059 B/op 4 allocs/op
BenchmarkGet/4KWithPool-4 200000 8906 ns/op 459.87 MB/s 4195 B/op 3 allocs/op
BenchmarkGet/8K-4 100000 12024 ns/op 681.28 MB/s 17763 B/op 4 allocs/op
BenchmarkGet/8KWithPool-4 200000 11103 ns/op 737.79 MB/s 8291 B/op 3 allocs/op
BenchmarkGet/16K-4 100000 16844 ns/op 972.65 MB/s 34915 B/op 4 allocs/op
BenchmarkGet/16KWithPool-4 100000 14575 ns/op 1124.10 MB/s 16483 B/op 3 allocs/op
BenchmarkGet/32K-4 50000 27770 ns/op 1179.97 MB/s 73827 B/op 4 allocs/op
BenchmarkGet/32KWithPool-4 100000 24495 ns/op 1337.74 MB/s 32867 B/op 3 allocs/op
BenchmarkGet/128B-4 316515 3263 ns/op 39.22 MB/s 160 B/op 1 allocs/op
BenchmarkGet/256B-4 382551 3204 ns/op 79.90 MB/s 288 B/op 1 allocs/op
BenchmarkGet/512B-4 357216 3835 ns/op 133.51 MB/s 576 B/op 1 allocs/op
BenchmarkGet/1K-4 274958 4429 ns/op 231.20 MB/s 1152 B/op 1 allocs/op
BenchmarkGet/2K-4 227764 5013 ns/op 408.55 MB/s 2304 B/op 1 allocs/op
BenchmarkGet/4K-4 187557 5534 ns/op 740.15 MB/s 4864 B/op 1 allocs/op
BenchmarkGet/8K-4 153546 7652 ns/op 1070.56 MB/s 9472 B/op 1 allocs/op
BenchmarkGet/16K-4 115549 10272 ns/op 1594.95 MB/s 18432 B/op 1 allocs/op
BenchmarkGet/32K-4 69592 16405 ns/op 1997.39 MB/s 40960 B/op 1 allocs/op
BenchmarkPut/128B-4 100000 17492 ns/op 7.32 MB/s 441 B/op 6 allocs/op
BenchmarkPut/256B-4 100000 17234 ns/op 14.85 MB/s 571 B/op 6 allocs/op
BenchmarkPut/512B-4 100000 22837 ns/op 22.42 MB/s 861 B/op 6 allocs/op
BenchmarkPut/1K-4 50000 30333 ns/op 33.76 MB/s 1443 B/op 6 allocs/op
BenchmarkPut/2K-4 30000 45304 ns/op 45.21 MB/s 2606 B/op 6 allocs/op
BenchmarkPut/4K-4 20000 83953 ns/op 48.79 MB/s 5187 B/op 6 allocs/op
BenchmarkPut/8K-4 10000 142142 ns/op 57.63 MB/s 9845 B/op 6 allocs/op
BenchmarkPut/16K-4 5000 206722 ns/op 79.26 MB/s 18884 B/op 6 allocs/op
BenchmarkPut/32K-4 5000 361108 ns/op 90.74 MB/s 41582 B/op 7 allocs/op
BenchmarkPut/128BNoSync-4 123519 11094 ns/op 11.54 MB/s 49 B/op 2 allocs/op
BenchmarkPut/256BNoSync-4 84662 13398 ns/op 19.11 MB/s 50 B/op 2 allocs/op
BenchmarkPut/1KNoSync-4 46345 24855 ns/op 41.20 MB/s 58 B/op 2 allocs/op
BenchmarkPut/2KNoSync-4 28820 43817 ns/op 46.74 MB/s 68 B/op 2 allocs/op
BenchmarkPut/4KNoSync-4 13976 90059 ns/op 45.48 MB/s 89 B/op 2 allocs/op
BenchmarkPut/8KNoSync-4 7852 155101 ns/op 52.82 MB/s 130 B/op 2 allocs/op
BenchmarkPut/16KNoSync-4 4848 238113 ns/op 68.81 MB/s 226 B/op 2 allocs/op
BenchmarkPut/32KNoSync-4 2564 391483 ns/op 83.70 MB/s 377 B/op 3 allocs/op
BenchmarkScan-4 1000000 1679 ns/op 408 B/op 16 allocs/op
BenchmarkPut/128BSync-4 260 4611273 ns/op 0.03 MB/s 48 B/op 2 allocs/op
BenchmarkPut/256BSync-4 265 4665506 ns/op 0.05 MB/s 48 B/op 2 allocs/op
BenchmarkPut/1KSync-4 256 4757334 ns/op 0.22 MB/s 48 B/op 2 allocs/op
BenchmarkPut/2KSync-4 255 4996788 ns/op 0.41 MB/s 92 B/op 2 allocs/op
BenchmarkPut/4KSync-4 222 5136481 ns/op 0.80 MB/s 98 B/op 2 allocs/op
BenchmarkPut/8KSync-4 223 5530824 ns/op 1.48 MB/s 99 B/op 2 allocs/op
BenchmarkPut/16KSync-4 213 5717880 ns/op 2.87 MB/s 202 B/op 2 allocs/op
BenchmarkPut/32KSync-4 211 5835948 ns/op 5.61 MB/s 355 B/op 3 allocs/op
BenchmarkScan-4 568696 2036 ns/op 392 B/op 33 allocs/op
PASS
```
For 128B values:
* ~200,000 reads/sec
* ~50,000 writes/sec
* ~300,000 reads/sec
* ~90,000 writes/sec
* ~490,000 scans/sec
The full benchmark above shows linear performance as you increase key/value sizes. Memory pooling starts to become advantageous for larger values.
The full benchmark above shows linear performance as you increase key/value sizes.
## Stargazers over time
[![Stargazers over time](https://starcharts.herokuapp.com/prologic/bitcask.svg)](https://starcharts.herokuapp.com/prologic/bitcask)
## Support
Support the ongoing development of Bitcask!
**Sponser**
- Become a [Sponsor](https://www.patreon.com/prologic)
## Contributors
Thank you to all those that have contributed to this project, battle-tested it, used it in their own projects or pdocuts, fixed bugs, improved performance and even fix tiny tpyos in documentation! Thank you and keep contirbuting!
Thank you to all those that have contributed to this project, battle-tested it, used it in their own projects or products, fixed bugs, improved performance and even fix tiny typos in documentation! Thank you and keep contributing!
You can find an [AUTHORS](/AUTHORS) file where we keep a list of contributors to the project. If you contriibute a PR please consider adding your name there. There is also Github's own [Contributors](https://github.com/prologic/bitcask/graphs/contributors) statistics.

View File

@@ -1,7 +1,6 @@
package bitcask
import (
"encoding/json"
"errors"
"hash/crc32"
"io"
@@ -11,10 +10,12 @@ import (
"path/filepath"
"sync"
"github.com/derekparker/trie"
"github.com/gofrs/flock"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
"github.com/prologic/bitcask/internal/config"
"github.com/prologic/bitcask/internal/data"
"github.com/prologic/bitcask/internal/index"
)
var (
@@ -36,10 +37,6 @@ var (
// ErrDatabaseLocked is the error returned if the database is locked
// (typically opened by another process)
ErrDatabaseLocked = errors.New("error: database locked")
// ErrCreatingMemPool is the error returned when trying to configurate
// the mempool fails
ErrCreatingMemPool = errors.New("error: creating the mempool failed")
)
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
@@ -50,13 +47,13 @@ type Bitcask struct {
*flock.Flock
config *config
config *config.Config
options []Option
path string
curr *internal.Datafile
keydir *internal.Keydir
datafiles map[int]*internal.Datafile
trie *trie.Trie
curr data.Datafile
datafiles map[int]data.Datafile
trie art.Tree
indexer index.Indexer
}
// Stats is a struct returned by Stats() on an open Bitcask instance
@@ -69,16 +66,14 @@ type Stats struct {
// Stats returns statistics about the database including the number of
// data files, keys and overall size on disk of the data
func (b *Bitcask) Stats() (stats Stats, err error) {
var size int64
size, err = internal.DirSize(b.path)
if err != nil {
if stats.Size, err = internal.DirSize(b.path); err != nil {
return
}
b.mu.RLock()
stats.Datafiles = len(b.datafiles)
stats.Keys = b.keydir.Len()
stats.Size = size
stats.Keys = b.trie.Size()
b.mu.RUnlock()
return
}
@@ -92,7 +87,7 @@ func (b *Bitcask) Close() error {
os.Remove(b.Flock.Path())
}()
if err := b.keydir.Save(path.Join(b.path, "index")); err != nil {
if err := b.indexer.Save(b.trie, filepath.Join(b.path, "index")); err != nil {
return err
}
@@ -112,14 +107,18 @@ func (b *Bitcask) Sync() error {
// Get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
func (b *Bitcask) Get(key string) ([]byte, error) {
var df *internal.Datafile
func (b *Bitcask) Get(key []byte) ([]byte, error) {
var df data.Datafile
item, ok := b.keydir.Get(key)
if !ok {
b.mu.RLock()
value, found := b.trie.Search(key)
if !found {
b.mu.RUnlock()
return nil, ErrKeyNotFound
}
item := value.(internal.Item)
if item.FileID == b.curr.FileID() {
df = b.curr
} else {
@@ -127,6 +126,7 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
}
e, err := df.ReadAt(item.Offset, item.Size)
b.mu.RUnlock()
if err != nil {
return nil, err
}
@@ -140,41 +140,54 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
}
// Has returns true if the key exists in the database, false otherwise.
func (b *Bitcask) Has(key string) bool {
_, ok := b.keydir.Get(key)
return ok
func (b *Bitcask) Has(key []byte) bool {
b.mu.RLock()
_, found := b.trie.Search(key)
b.mu.RUnlock()
return found
}
// Put stores the key and value in the database.
func (b *Bitcask) Put(key string, value []byte) error {
if len(key) > b.config.maxKeySize {
func (b *Bitcask) Put(key, value []byte) error {
if uint32(len(key)) > b.config.MaxKeySize {
return ErrKeyTooLarge
}
if len(value) > b.config.maxValueSize {
if uint64(len(value)) > b.config.MaxValueSize {
return ErrValueTooLarge
}
b.mu.Lock()
offset, n, err := b.put(key, value)
if err != nil {
b.mu.Unlock()
return err
}
item := b.keydir.Add(key, b.curr.FileID(), offset, n)
b.trie.Add(key, item)
if b.config.Sync {
if err := b.curr.Sync(); err != nil {
b.mu.Unlock()
return err
}
}
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
b.trie.Insert(key, item)
b.mu.Unlock()
return nil
}
// Delete deletes the named key. If the key doesn't exist or an I/O error
// occurs the error is returned.
func (b *Bitcask) Delete(key string) error {
func (b *Bitcask) Delete(key []byte) error {
b.mu.Lock()
_, _, err := b.put(key, []byte{})
if err != nil {
b.mu.Unlock()
return err
}
b.keydir.Delete(key)
b.trie.Remove(key)
b.trie.Delete(key)
b.mu.Unlock()
return nil
}
@@ -182,44 +195,65 @@ func (b *Bitcask) Delete(key string) error {
// Scan performs a prefix scan of keys matching the given prefix and calling
// the function `f` with the keys found. If the function returns an error
// no further keys are processed and the first error returned.
func (b *Bitcask) Scan(prefix string, f func(key string) error) error {
keys := b.trie.PrefixSearch(prefix)
for _, key := range keys {
if err := f(key); err != nil {
return err
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
// Skip the root node
if len(node.Key()) == 0 {
return true
}
}
return nil
if err = f(node.Key()); err != nil {
return false
}
return true
})
return
}
// Len returns the total number of keys in the database
func (b *Bitcask) Len() int {
return b.keydir.Len()
b.mu.RLock()
defer b.mu.RUnlock()
return b.trie.Size()
}
// Keys returns all keys in the database as a channel of string(s)
func (b *Bitcask) Keys() chan string {
return b.keydir.Keys()
// Keys returns all keys in the database as a channel of keys
func (b *Bitcask) Keys() chan []byte {
ch := make(chan []byte)
go func() {
b.mu.RLock()
defer b.mu.RUnlock()
for it := b.trie.Iterator(); it.HasNext(); {
node, _ := it.Next()
ch <- node.Key()
}
close(ch)
}()
return ch
}
// Fold iterates over all keys in the database calling the function `f` for
// each key. If the function returns an error, no further keys are processed
// and the error returned.
func (b *Bitcask) Fold(f func(key string) error) error {
for key := range b.keydir.Keys() {
if err := f(key); err != nil {
return err
func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEach(func(node art.Node) bool {
if err = f(node.Key()); err != nil {
return false
}
}
return nil
return true
})
return
}
func (b *Bitcask) put(key string, value []byte) (int64, int64, error) {
b.mu.Lock()
defer b.mu.Unlock()
func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
size := b.curr.Size()
if size >= int64(b.config.maxDatafileSize) {
if size >= int64(b.config.MaxDatafileSize) {
err := b.curr.Close()
if err != nil {
return -1, 0, err
@@ -227,7 +261,7 @@ func (b *Bitcask) put(key string, value []byte) (int64, int64, error) {
id := b.curr.FileID()
df, err := internal.NewDatafile(b.path, id, true)
df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return -1, 0, err
}
@@ -235,7 +269,7 @@ func (b *Bitcask) put(key string, value []byte) (int64, int64, error) {
b.datafiles[id] = df
id = b.curr.FileID() + 1
curr, err := internal.NewDatafile(b.path, id, false)
curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return -1, 0, err
}
@@ -246,103 +280,29 @@ func (b *Bitcask) put(key string, value []byte) (int64, int64, error) {
return b.curr.Write(e)
}
func (b *Bitcask) readConfig() error {
if internal.Exists(filepath.Join(b.path, "config.json")) {
data, err := ioutil.ReadFile(filepath.Join(b.path, "config.json"))
if err != nil {
return err
}
if err := json.Unmarshal(data, &b.config); err != nil {
return err
}
}
return nil
}
func (b *Bitcask) writeConfig() error {
data, err := json.Marshal(b.config)
if err != nil {
return err
}
return ioutil.WriteFile(filepath.Join(b.path, "config.json"), data, 0600)
}
func (b *Bitcask) reopen() error {
b.mu.Lock()
defer b.mu.Unlock()
fns, err := internal.GetDatafiles(b.path)
datafiles, lastID, err := loadDatafiles(b.path, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return err
}
ids, err := internal.ParseIds(fns)
t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles)
if err != nil {
return err
}
datafiles := make(map[int]*internal.Datafile, len(ids))
for _, id := range ids {
df, err := internal.NewDatafile(b.path, id, true)
if err != nil {
return err
}
datafiles[id] = df
}
keydir := internal.NewKeydir()
trie := trie.New()
if internal.Exists(path.Join(b.path, "index")) {
if err := keydir.Load(path.Join(b.path, "index")); err != nil {
return err
}
for key := range keydir.Keys() {
item, _ := keydir.Get(key)
trie.Add(key, item)
}
} else {
for i, df := range datafiles {
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
keydir.Delete(e.Key)
continue
}
item := keydir.Add(e.Key, ids[i], e.Offset, n)
trie.Add(e.Key, item)
}
}
}
var id int
if len(ids) > 0 {
id = ids[(len(ids) - 1)]
}
curr, err := internal.NewDatafile(b.path, id, false)
curr, err := data.NewDatafile(b.path, lastID, false, b.config.MaxKeySize, b.config.MaxValueSize)
if err != nil {
return err
}
b.trie = t
b.curr = curr
b.datafiles = datafiles
b.keydir = keydir
b.trie = trie
return nil
}
@@ -366,7 +326,7 @@ func (b *Bitcask) Merge() error {
// Rewrite all key/value pairs into merged database
// Doing this automatically strips deleted keys and
// old key/value pairs
err = b.Fold(func(key string) error {
err = b.Fold(func(key []byte) error {
value, err := b.Get(key)
if err != nil {
return err
@@ -431,7 +391,7 @@ func (b *Bitcask) Merge() error {
// configuration options as functions.
func Open(path string, options ...Option) (*Bitcask, error) {
var (
cfg *config
cfg *config.Config
err error
)
@@ -439,8 +399,13 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return nil, err
}
cfg, err = getConfig(path)
if err != nil {
configPath := filepath.Join(path, "config.json")
if internal.Exists(configPath) {
cfg, err = config.Load(configPath)
if err != nil {
return nil, err
}
} else {
cfg = newDefaultConfig()
}
@@ -449,6 +414,7 @@ func Open(path string, options ...Option) (*Bitcask, error) {
config: cfg,
options: options,
path: path,
indexer: index.NewIndexer(),
}
for _, opt := range options {
@@ -457,8 +423,6 @@ func Open(path string, options ...Option) (*Bitcask, error) {
}
}
internal.ConfigureMemPool(bitcask.config.maxConcurrency)
locked, err := bitcask.Flock.TryLock()
if err != nil {
return nil, err
@@ -468,7 +432,7 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return nil, ErrDatabaseLocked
}
if err := bitcask.writeConfig(); err != nil {
if err := cfg.Save(configPath); err != nil {
return nil, err
}
@@ -479,15 +443,59 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return bitcask, nil
}
// Merge calls Bitcask.Merge()
// XXX: Deprecated; Please use the `.Merge()` method
// XXX: This is only kept here for backwards compatibility
// it will be removed in future releases at some point
func Merge(path string, force bool) error {
db, err := Open(path)
func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64) (datafiles map[int]data.Datafile, lastID int, err error) {
fns, err := internal.GetDatafiles(path)
if err != nil {
return err
return nil, 0, err
}
return db.Merge()
ids, err := internal.ParseIds(fns)
if err != nil {
return nil, 0, err
}
datafiles = make(map[int]data.Datafile, len(ids))
for _, id := range ids {
datafiles[id], err = data.NewDatafile(path, id, true, maxKeySize, maxValueSize)
if err != nil {
return
}
}
if len(ids) > 0 {
lastID = ids[len(ids)-1]
}
return
}
func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile) (art.Tree, error) {
t, found, err := indexer.Load(filepath.Join(path, "index"), maxKeySize)
if err != nil {
return nil, err
}
if !found {
for _, df := range datafiles {
var offset int64
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
t.Delete(e.Key)
offset += n
continue
}
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
t.Insert(e.Key, item)
offset += n
}
}
}
return t, nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -37,7 +37,7 @@ func del(path, key string) int {
}
defer db.Close()
err = db.Delete(key)
err = db.Delete([]byte(key))
if err != nil {
log.WithError(err).Error("error deleting key")
return 1

View File

@@ -50,11 +50,11 @@ func init() {
"with-max-datafile-size", "", bitcask.DefaultMaxDatafileSize,
"Maximum size of each datafile",
)
exportCmd.PersistentFlags().IntP(
exportCmd.PersistentFlags().Uint32P(
"with-max-key-size", "", bitcask.DefaultMaxKeySize,
"Maximum size of each key",
)
exportCmd.PersistentFlags().IntP(
exportCmd.PersistentFlags().Uint64P(
"with-max-value-size", "", bitcask.DefaultMaxValueSize,
"Maximum size of each value",
)
@@ -66,11 +66,6 @@ type kvPair struct {
}
func export(path, output string) int {
var (
err error
w io.WriteCloser
)
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
@@ -78,19 +73,29 @@ func export(path, output string) int {
}
defer db.Close()
if output == "-" {
w = os.Stdout
} else {
w, err = os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_TRUNC, 0755)
if err != nil {
w := os.Stdout
if output != "-" {
if w, err = os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_TRUNC, 0755); err != nil {
log.WithError(err).
WithField("output", output).
Error("error opening output for writing")
return 1
}
defer w.Close()
}
err = db.Fold(func(key string) error {
if err = db.Fold(exportKey(db, w)); err != nil {
log.WithError(err).
WithField("path", path).
WithField("output", output).
Error("error exporting keys")
return 2
}
return 0
}
func exportKey(db *bitcask.Bitcask, w io.Writer) func(key []byte) error {
return func(key []byte) error {
value, err := db.Get(key)
if err != nil {
log.WithError(err).
@@ -129,14 +134,5 @@ func export(path, output string) int {
}
return nil
})
if err != nil {
log.WithError(err).
WithField("path", path).
WithField("output", output).
Error("error exporting keys")
return 2
}
return 0
}

View File

@@ -38,7 +38,7 @@ func get(path, key string) int {
}
defer db.Close()
value, err := db.Get(key)
value, err := db.Get([]byte(key))
if err != nil {
log.WithError(err).Error("error reading key")
return 1

View File

@@ -89,7 +89,7 @@ func _import(path, input string) int {
return 2
}
if err := db.Put(string(key), value); err != nil {
if err := db.Put(key, value); err != nil {
log.WithError(err).Error("error writing key/value")
return 2
}

View File

@@ -30,8 +30,8 @@ var initdbCmd = &cobra.Command{
path := viper.GetString("path")
maxDatafileSize := viper.GetInt("with-max-datafile-size")
maxKeySize := viper.GetInt("with-max-key-size")
maxValueSize := viper.GetInt("with-max-value-size")
maxKeySize := viper.GetUint32("with-max-key-size")
maxValueSize := viper.GetUint64("with-max-value-size")
db, err := bitcask.Open(
path,
@@ -56,11 +56,11 @@ func init() {
"with-max-datafile-size", "", bitcask.DefaultMaxDatafileSize,
"Maximum size of each datafile",
)
initdbCmd.PersistentFlags().IntP(
initdbCmd.PersistentFlags().Uint32P(
"with-max-key-size", "", bitcask.DefaultMaxKeySize,
"Maximum size of each key",
)
initdbCmd.PersistentFlags().IntP(
initdbCmd.PersistentFlags().Uint64P(
"with-max-value-size", "", bitcask.DefaultMaxValueSize,
"Maximum size of each value",
)

View File

@@ -36,8 +36,8 @@ func keys(path string) int {
}
defer db.Close()
err = db.Fold(func(key string) error {
fmt.Printf("%s\n", key)
err = db.Fold(func(key []byte) error {
fmt.Printf("%s\n", string(key))
return nil
})
if err != nil {

View File

@@ -55,7 +55,7 @@ func put(path, key string, value io.Reader) int {
return 1
}
err = db.Put(key, data)
err = db.Put([]byte(key), data)
if err != nil {
log.WithError(err).Error("error writing key")
return 1

141
cmd/bitcask/recover.go Normal file
View File

@@ -0,0 +1,141 @@
package main
import (
"fmt"
"io"
"os"
"path/filepath"
"github.com/prologic/bitcask"
"github.com/prologic/bitcask/internal"
"github.com/prologic/bitcask/internal/config"
"github.com/prologic/bitcask/internal/data/codec"
"github.com/prologic/bitcask/internal/index"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var recoveryCmd = &cobra.Command{
Use: "recover",
Aliases: []string{"recovery"},
Short: "Analyzes and recovers the index file for corruption scenarios",
Long: `This analyze files to detect different forms of persistence corruption in
persisted files. It also allows to recover the files to the latest point of integrity.
Recovered files have the .recovered extension`,
Args: cobra.ExactArgs(0),
PreRun: func(cmd *cobra.Command, args []string) {
viper.BindPFlag("dry-run", cmd.Flags().Lookup("dry-run"))
},
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
dryRun := viper.GetBool("dry-run")
os.Exit(recover(path, dryRun))
},
}
func init() {
RootCmd.AddCommand(recoveryCmd)
recoveryCmd.Flags().BoolP("dry-run", "n", false, "Will only check files health without applying recovery if unhealthy")
}
func recover(path string, dryRun bool) int {
maxKeySize := bitcask.DefaultMaxKeySize
maxValueSize := bitcask.DefaultMaxValueSize
if cfg, err := config.Load(filepath.Join(path, "config.json")); err == nil {
maxKeySize = cfg.MaxKeySize
maxValueSize = cfg.MaxValueSize
}
if err := recoverIndex(filepath.Join(path, "index"), maxKeySize, dryRun); err != nil {
log.WithError(err).Info("recovering index file")
return 1
}
datafiles, err := internal.GetDatafiles(path)
if err != nil {
log.WithError(err).Info("coudn't list existing datafiles")
return 1
}
for _, file := range datafiles {
if err := recoverDatafile(file, maxKeySize, maxValueSize, dryRun); err != nil {
log.WithError(err).Info("recovering data file")
return 1
}
}
return 0
}
func recoverIndex(path string, maxKeySize uint32, dryRun bool) error {
t, found, err := index.NewIndexer().Load(path, maxKeySize)
if err != nil && !index.IsIndexCorruption(err) {
log.WithError(err).Info("opening the index file")
}
if !found {
log.Info("index file doesn't exist, will be recreated on next run.")
return nil
}
if err == nil {
log.Debug("index file is not corrupted")
return nil
}
log.Debugf("index file is corrupted: %v", err)
if dryRun {
log.Debug("dry-run mode, not writing to a file")
return nil
}
// Leverage that t has the partiatially read tree even on corrupted files
err = index.NewIndexer().Save(t, "index.recovered")
if err != nil {
return fmt.Errorf("writing the recovered index file: %w", err)
}
log.Debug("the index was recovered in the index.recovered new file")
return nil
}
func recoverDatafile(path string, maxKeySize uint32, maxValueSize uint64, dryRun bool) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("opening the datafile: %w", err)
}
defer f.Close()
_, file := filepath.Split(path)
fr, err := os.OpenFile(fmt.Sprintf("%s.recovered", file), os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return fmt.Errorf("creating the recovered datafile: %w", err)
}
defer fr.Close()
dec := codec.NewDecoder(f, maxKeySize, maxValueSize)
enc := codec.NewEncoder(fr)
e := internal.Entry{}
for {
_, err = dec.Decode(&e)
if err == io.EOF {
break
}
if codec.IsCorruptedData(err) {
log.Debugf("%s is corrupted, a best-effort recovery was done", file)
return nil
}
if err != nil {
return fmt.Errorf("unexpected error while reading datafile: %w", err)
}
if dryRun {
continue
}
if _, err := enc.Encode(e); err != nil {
return fmt.Errorf("writing to recovered datafile: %w", err)
}
}
if err := os.Remove(fr.Name()); err != nil {
return fmt.Errorf("can't remove temporal recovered datafile: %w", err)
}
log.Debugf("%s is not corrupted", file)
return nil
}

View File

@@ -14,7 +14,7 @@ import (
var scanCmd = &cobra.Command{
Use: "scan <prefix>",
Aliases: []string{"search", "find"},
Short: "Perform a prefis scan for keys",
Short: "Perform a prefix scan for keys",
Long: `This performa a prefix scan for keys starting with the given
prefix. This uses a Trie to search for matching keys and returns all matched
keys.`,
@@ -40,7 +40,7 @@ func scan(path, prefix string) int {
}
defer db.Close()
err = db.Scan(prefix, func(key string) error {
err = db.Scan([]byte(prefix), func(key []byte) error {
value, err := db.Get(key)
if err != nil {
log.WithError(err).Error("error reading key")

View File

@@ -3,26 +3,22 @@ package main
import (
"fmt"
"os"
"strings"
log "github.com/sirupsen/logrus"
flag "github.com/spf13/pflag"
"github.com/tidwall/redcon"
"github.com/prologic/bitcask"
"github.com/prologic/bitcask/internal"
)
var (
bind string
debug bool
version bool
maxDatafileSize int
bind string
debug bool
version bool
)
func init() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [options] <path>\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Usage: %s [options] <dbpath>\n", os.Args[0])
flag.PrintDefaults()
}
@@ -30,8 +26,6 @@ func init() {
flag.BoolVarP(&debug, "debug", "d", false, "enable debug logging")
flag.StringVarP(&bind, "bind", "b", ":6379", "interface and port to bind to")
flag.IntVar(&maxDatafileSize, "max-datafile-size", 1<<20, "maximum datafile size in bytes")
}
func main() {
@@ -55,86 +49,11 @@ func main() {
path := flag.Arg(0)
db, err := bitcask.Open(path, bitcask.WithMaxDatafileSize(maxDatafileSize))
server, err := newServer(bind, path)
if err != nil {
log.WithError(err).WithField("path", path).Error("error opening database")
os.Exit(1)
log.WithError(err).Error("error creating server")
os.Exit(2)
}
log.WithField("bind", bind).WithField("path", path).Infof("starting bitcaskd v%s", internal.FullVersion())
err = redcon.ListenAndServe(bind,
func(conn redcon.Conn, cmd redcon.Command) {
switch strings.ToLower(string(cmd.Args[0])) {
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
case "set":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
value := cmd.Args[2]
err = db.Put(key, value)
if err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err))
} else {
conn.WriteString("OK")
}
case "get":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
value, err := db.Get(key)
if err != nil {
conn.WriteNull()
} else {
conn.WriteBulk(value)
}
case "keys":
conn.WriteArray(db.Len())
for key := range db.Keys() {
conn.WriteBulk([]byte(key))
}
case "exists":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
if db.Has(key) {
conn.WriteInt(1)
} else {
conn.WriteInt(0)
}
case "del":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
err := db.Delete(key)
if err != nil {
conn.WriteInt(0)
} else {
conn.WriteInt(1)
}
default:
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
}
},
func(conn redcon.Conn) bool {
return true
},
func(conn redcon.Conn, err error) {
},
)
if err != nil {
log.WithError(err).Fatal("oops")
}
log.Fatal(server.Run())
}

122
cmd/bitcaskd/server.go Normal file
View File

@@ -0,0 +1,122 @@
package main
import (
"fmt"
"strings"
log "github.com/sirupsen/logrus"
"github.com/tidwall/redcon"
"github.com/prologic/bitcask"
)
type server struct {
bind string
db *bitcask.Bitcask
}
func newServer(bind, dbpath string) (*server, error) {
db, err := bitcask.Open(dbpath)
if err != nil {
log.WithError(err).WithField("dbpath", dbpath).Error("error opening database")
return nil, err
}
return &server{
bind: bind,
db: db,
}, nil
}
func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) {
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
value := cmd.Args[2]
if err := s.db.Put(key, value); err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err))
} else {
conn.WriteString("OK")
}
}
func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) {
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
value, err := s.db.Get(key)
if err != nil {
conn.WriteNull()
} else {
conn.WriteBulk(value)
}
}
func (s *server) handleKeys(cmd redcon.Command, conn redcon.Conn) {
conn.WriteArray(s.db.Len())
for key := range s.db.Keys() {
conn.WriteBulk([]byte(key))
}
}
func (s *server) handleExists(cmd redcon.Command, conn redcon.Conn) {
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
if s.db.Has(key) {
conn.WriteInt(1)
} else {
conn.WriteInt(0)
}
}
func (s *server) handleDel(cmd redcon.Command, conn redcon.Conn) {
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := cmd.Args[1]
if err := s.db.Delete(key); err != nil {
conn.WriteInt(0)
} else {
conn.WriteInt(1)
}
}
func (s *server) Run() (err error) {
err = redcon.ListenAndServe(s.bind,
func(conn redcon.Conn, cmd redcon.Command) {
switch strings.ToLower(string(cmd.Args[0])) {
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
case "set":
s.handleSet(cmd, conn)
case "get":
s.handleGet(cmd, conn)
case "keys":
s.handleKeys(cmd, conn)
case "exists":
s.handleExists(cmd, conn)
case "del":
s.handleDel(cmd, conn)
default:
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
}
},
func(conn redcon.Conn) bool {
return true
},
func(conn redcon.Conn, err error) {
},
)
return
}

10
doc.go
View File

@@ -1,13 +1,3 @@
// Package bitcask implements a high-performance key-value store based on a
// WAL and LSM.
//
// By default, the client assumes a default configuration regarding maximum key size,
// maximum value size, maximum datafile size, and memory pools to avoid allocations.
// Refer to Constants section to know default values.
//
// For extra performance, configure the memory pool option properly. This option
// requires to specify the maximum number of concurrent use of the package. Failing to
// set a high-enough value would impact latency and throughput. Likewise, overestimating
// would yield in an unnecessary big memory footprint.
// The default configuration doesn't use a memory pool.
package bitcask

View File

@@ -8,7 +8,6 @@ func Example_withOptions() {
opts := []Option{
WithMaxKeySize(1024),
WithMaxValueSize(4096),
WithMemPool(10),
}
_, _ = Open("path/to/db", opts...)
}

11
go.mod
View File

@@ -1,15 +1,14 @@
module github.com/prologic/bitcask
go 1.13
require (
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d
github.com/gofrs/flock v0.7.1
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.2
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/pelletier/go-toml v1.4.0 // indirect
github.com/pkg/errors v0.8.1
github.com/plar/go-adaptive-radix-tree v1.0.1
github.com/sirupsen/logrus v1.4.2
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cobra v0.0.5
@@ -18,7 +17,7 @@ require (
github.com/spf13/viper v1.4.0
github.com/stretchr/testify v1.3.0
github.com/tidwall/redcon v1.0.0
golang.org/x/exp v0.0.0-20190718202018-cfdd5522f6f6
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa // indirect
golang.org/x/text v0.3.2 // indirect
)

27
go.sum
View File

@@ -17,12 +17,9 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d h1:TocZO8frNoxkwqFPePHFldSw8vLu+gBrlvFZYWqxiF4=
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d/go.mod h1:D6ICZm05D9VN1n/8iOtBxLpXtoGp6HDFUJ1RNVieOSE=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
@@ -35,16 +32,12 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
@@ -59,7 +52,6 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -79,8 +71,6 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg=
@@ -88,6 +78,8 @@ github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUr
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/plar/go-adaptive-radix-tree v1.0.1 h1:J+2qrXaKWLACw59s8SlTVYYxWjlUr/BlCsfkAzn96/0=
github.com/plar/go-adaptive-radix-tree v1.0.1/go.mod h1:Ot8d28EII3i7Lv4PSvBlF8ejiD/CtRYDuPsySJbSaK8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@@ -120,11 +112,11 @@ github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmq
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
@@ -141,12 +133,11 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20190718202018-cfdd5522f6f6 h1:1xlTfQXUyQvvbx4TFsXRgoj9r0YUwcqq9XGX9u9OODU=
golang.org/x/exp v0.0.0-20190718202018-cfdd5522f6f6/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4=
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56 h1:estk1glOnSVeJ9tdEZZc5mAMDZk5lNJNyJ6DvrBkTEU=
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@@ -166,13 +157,13 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
@@ -182,13 +173,13 @@ golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846 h1:0oJP+9s5Z3MT6dym56c4f7nVeujVpL1QyD2Vp/bTql0=
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

54
internal/config/config.go Normal file
View File

@@ -0,0 +1,54 @@
package config
import (
"encoding/json"
"io/ioutil"
"os"
)
// Config contains the bitcask configuration parameters
type Config struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize uint32 `json:"max_key_size"`
MaxValueSize uint64 `json:"max_value_size"`
Sync bool `json:"sync"`
}
// Load loads a configuration from the given path
func Load(path string) (*Config, error) {
var cfg Config
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}
// Save saves the configuration to the provided path
func (c *Config) Save(path string) error {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return err
}
data, err := json.Marshal(c)
if err != nil {
return err
}
if _, err = f.Write(data); err != nil {
return err
}
if err = f.Sync(); err != nil {
return err
}
return f.Close()
}

View File

@@ -0,0 +1,99 @@
package codec
import (
"encoding/binary"
"io"
"github.com/pkg/errors"
"github.com/prologic/bitcask/internal"
)
var (
errInvalidKeyOrValueSize = errors.New("key/value size is invalid")
errCantDecodeOnNilEntry = errors.New("can't decode on nil entry")
errTruncatedData = errors.New("data is truncated")
)
// NewDecoder creates a streaming Entry decoder.
func NewDecoder(r io.Reader, maxKeySize uint32, maxValueSize uint64) *Decoder {
return &Decoder{
r: r,
maxKeySize: maxKeySize,
maxValueSize: maxValueSize,
}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// Entry decodings on it.
type Decoder struct {
r io.Reader
maxKeySize uint32
maxValueSize uint64
}
// Decode decodes the next Entry from the current stream
func (d *Decoder) Decode(v *internal.Entry) (int64, error) {
if v == nil {
return 0, errCantDecodeOnNilEntry
}
prefixBuf := make([]byte, keySize+valueSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return 0, err
}
actualKeySize, actualValueSize, err := getKeyValueSizes(prefixBuf, d.maxKeySize, d.maxValueSize)
if err != nil {
return 0, err
}
buf := make([]byte, uint64(actualKeySize)+actualValueSize+checksumSize)
if _, err = io.ReadFull(d.r, buf); err != nil {
return 0, errTruncatedData
}
decodeWithoutPrefix(buf, actualKeySize, v)
return int64(keySize + valueSize + uint64(actualKeySize) + actualValueSize + checksumSize), nil
}
// DecodeEntry decodes a serialized entry
func DecodeEntry(b []byte, e *internal.Entry, maxKeySize uint32, maxValueSize uint64) error {
valueOffset, _, err := getKeyValueSizes(b, maxKeySize, maxValueSize)
if err != nil {
return errors.Wrap(err, "key/value sizes are invalid")
}
decodeWithoutPrefix(b[keySize+valueSize:], valueOffset, e)
return nil
}
func getKeyValueSizes(buf []byte, maxKeySize uint32, maxValueSize uint64) (uint32, uint64, error) {
actualKeySize := binary.BigEndian.Uint32(buf[:keySize])
actualValueSize := binary.BigEndian.Uint64(buf[keySize:])
if actualKeySize > maxKeySize || actualValueSize > maxValueSize || actualKeySize == 0 {
return 0, 0, errInvalidKeyOrValueSize
}
return actualKeySize, actualValueSize, nil
}
func decodeWithoutPrefix(buf []byte, valueOffset uint32, v *internal.Entry) {
v.Key = buf[:valueOffset]
v.Value = buf[valueOffset : len(buf)-checksumSize]
v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:])
}
// IsCorruptedData indicates if the error correspondes to possible data corruption
func IsCorruptedData(err error) bool {
switch err {
case errCantDecodeOnNilEntry, errInvalidKeyOrValueSize, errTruncatedData:
return true
default:
return false
}
}

View File

@@ -0,0 +1,109 @@
package codec
import (
"bytes"
"encoding/binary"
"io"
"testing"
"github.com/prologic/bitcask/internal"
"github.com/stretchr/testify/assert"
)
func TestDecodeOnNilEntry(t *testing.T) {
t.Parallel()
assert := assert.New(t)
decoder := NewDecoder(&bytes.Buffer{}, 1, 1)
_, err := decoder.Decode(nil)
if assert.Error(err) {
assert.Equal(errCantDecodeOnNilEntry, err)
}
}
func TestShortPrefix(t *testing.T) {
t.Parallel()
assert := assert.New(t)
maxKeySize, maxValueSize := uint32(10), uint64(20)
prefix := make([]byte, keySize+valueSize)
binary.BigEndian.PutUint32(prefix, 1)
binary.BigEndian.PutUint64(prefix[keySize:], 1)
truncBytesCount := 2
buf := bytes.NewBuffer(prefix[:keySize+valueSize-truncBytesCount])
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
_, err := decoder.Decode(&internal.Entry{})
if assert.Error(err) {
assert.Equal(io.ErrUnexpectedEOF, err)
}
}
func TestInvalidValueKeySizes(t *testing.T) {
assert := assert.New(t)
maxKeySize, maxValueSize := uint32(10), uint64(20)
tests := []struct {
keySize uint32
valueSize uint64
name string
}{
{keySize: 0, valueSize: 5, name: "zero key size"}, //zero value size is correct for tombstones
{keySize: 11, valueSize: 5, name: "key size overflow"},
{keySize: 5, valueSize: 21, name: "value size overflow"},
{keySize: 11, valueSize: 21, name: "key and value size overflow"},
}
for i := range tests {
i := i
t.Run(tests[i].name, func(t *testing.T) {
t.Parallel()
prefix := make([]byte, keySize+valueSize)
binary.BigEndian.PutUint32(prefix, tests[i].keySize)
binary.BigEndian.PutUint64(prefix[keySize:], tests[i].valueSize)
buf := bytes.NewBuffer(prefix)
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
_, err := decoder.Decode(&internal.Entry{})
if assert.Error(err) {
assert.Equal(errInvalidKeyOrValueSize, err)
}
})
}
}
func TestTruncatedData(t *testing.T) {
assert := assert.New(t)
maxKeySize, maxValueSize := uint32(10), uint64(20)
key := []byte("foo")
value := []byte("bar")
data := make([]byte, keySize+valueSize+len(key)+len(value)+checksumSize)
binary.BigEndian.PutUint32(data, uint32(len(key)))
binary.BigEndian.PutUint64(data[keySize:], uint64(len(value)))
copy(data[keySize+valueSize:], key)
copy(data[keySize+valueSize+len(key):], value)
copy(data[keySize+valueSize+len(key)+len(value):], bytes.Repeat([]byte("0"), checksumSize))
tests := []struct {
data []byte
name string
}{
{data: data[:keySize+valueSize+len(key)-1], name: "truncated key"},
{data: data[:keySize+valueSize+len(key)+len(value)-1], name: "truncated value"},
{data: data[:keySize+valueSize+len(key)+len(value)+checksumSize-1], name: "truncated checksum"},
}
for i := range tests {
i := i
t.Run(tests[i].name, func(t *testing.T) {
t.Parallel()
buf := bytes.NewBuffer(tests[i].data)
decoder := NewDecoder(buf, maxKeySize, maxValueSize)
_, err := decoder.Decode(&internal.Entry{})
if assert.Error(err) {
assert.Equal(errTruncatedData, err)
}
})
}
}

View File

@@ -0,0 +1,57 @@
package codec
import (
"bufio"
"encoding/binary"
"io"
"github.com/pkg/errors"
"github.com/prologic/bitcask/internal"
)
const (
keySize = 4
valueSize = 8
checksumSize = 4
)
// NewEncoder creates a streaming Entry encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// Entry encodings on it.
type Encoder struct {
w *bufio.Writer
}
// Encode takes any Entry and streams it to the underlying writer.
// Messages are framed with a key-length and value-length prefix.
func (e *Encoder) Encode(msg internal.Entry) (int64, error) {
var bufKeyValue = make([]byte, keySize+valueSize)
binary.BigEndian.PutUint32(bufKeyValue[:keySize], uint32(len(msg.Key)))
binary.BigEndian.PutUint64(bufKeyValue[keySize:keySize+valueSize], uint64(len(msg.Value)))
if _, err := e.w.Write(bufKeyValue); err != nil {
return 0, errors.Wrap(err, "failed writing key & value length prefix")
}
if _, err := e.w.Write(msg.Key); err != nil {
return 0, errors.Wrap(err, "failed writing key data")
}
if _, err := e.w.Write(msg.Value); err != nil {
return 0, errors.Wrap(err, "failed writing value data")
}
bufChecksumSize := bufKeyValue[:checksumSize]
binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum)
if _, err := e.w.Write(bufChecksumSize); err != nil {
return 0, errors.Wrap(err, "failed writing checksum data")
}
if err := e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(keySize + valueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil
}

View File

@@ -0,0 +1,29 @@
package codec
import (
"bytes"
"encoding/hex"
"testing"
"github.com/prologic/bitcask/internal"
"github.com/stretchr/testify/assert"
)
func TestEncode(t *testing.T) {
t.Parallel()
assert := assert.New(t)
var buf bytes.Buffer
encoder := NewEncoder(&buf)
_, err := encoder.Encode(internal.Entry{
Key: []byte("mykey"),
Value: []byte("myvalue"),
Checksum: 414141,
Offset: 424242,
})
expectedHex := "0000000500000000000000076d796b65796d7976616c7565000651bd"
if assert.NoError(err) {
assert.Equal(expectedHex, hex.EncodeToString(buf.Bytes()))
}
}

195
internal/data/datafile.go Normal file
View File

@@ -0,0 +1,195 @@
package data
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/pkg/errors"
"github.com/prologic/bitcask/internal"
"github.com/prologic/bitcask/internal/data/codec"
"golang.org/x/exp/mmap"
)
const (
defaultDatafileFilename = "%09d.data"
)
var (
errReadonly = errors.New("error: read only datafile")
errReadError = errors.New("error: read error")
mxMemPool sync.RWMutex
)
// Datafile is an interface that represents a readable and writeable datafile
type Datafile interface {
FileID() int
Name() string
Close() error
Sync() error
Size() int64
Read() (internal.Entry, int64, error)
ReadAt(index, size int64) (internal.Entry, error)
Write(internal.Entry) (int64, int64, error)
}
type datafile struct {
sync.RWMutex
id int
r *os.File
ra *mmap.ReaderAt
w *os.File
offset int64
dec *codec.Decoder
enc *codec.Encoder
maxKeySize uint32
maxValueSize uint64
}
// NewDatafile opens an existing datafile
func NewDatafile(path string, id int, readonly bool, maxKeySize uint32, maxValueSize uint64) (Datafile, error) {
var (
r *os.File
ra *mmap.ReaderAt
w *os.File
err error
)
fn := filepath.Join(path, fmt.Sprintf(defaultDatafileFilename, id))
if !readonly {
w, err = os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
if err != nil {
return nil, err
}
}
r, err = os.Open(fn)
if err != nil {
return nil, err
}
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
ra, err = mmap.Open(fn)
if err != nil {
return nil, err
}
offset := stat.Size()
dec := codec.NewDecoder(r, maxKeySize, maxValueSize)
enc := codec.NewEncoder(w)
return &datafile{
id: id,
r: r,
ra: ra,
w: w,
offset: offset,
dec: dec,
enc: enc,
maxKeySize: maxKeySize,
maxValueSize: maxValueSize,
}, nil
}
func (df *datafile) FileID() int {
return df.id
}
func (df *datafile) Name() string {
return df.r.Name()
}
func (df *datafile) Close() error {
defer func() {
df.ra.Close()
df.r.Close()
}()
// Readonly datafile -- Nothing further to close on the write side
if df.w == nil {
return nil
}
err := df.Sync()
if err != nil {
return err
}
return df.w.Close()
}
func (df *datafile) Sync() error {
if df.w == nil {
return nil
}
return df.w.Sync()
}
func (df *datafile) Size() int64 {
df.RLock()
defer df.RUnlock()
return df.offset
}
// Read reads the next entry from the datafile
func (df *datafile) Read() (e internal.Entry, n int64, err error) {
df.Lock()
defer df.Unlock()
n, err = df.dec.Decode(&e)
if err != nil {
return
}
return
}
// ReadAt the entry located at index offset with expected serialized size
func (df *datafile) ReadAt(index, size int64) (e internal.Entry, err error) {
var n int
b := make([]byte, size)
if df.w == nil {
n, err = df.ra.ReadAt(b, index)
} else {
n, err = df.r.ReadAt(b, index)
}
if err != nil {
return
}
if int64(n) != size {
err = errReadError
return
}
codec.DecodeEntry(b, &e, df.maxKeySize, df.maxValueSize)
return
}
func (df *datafile) Write(e internal.Entry) (int64, int64, error) {
if df.w == nil {
return -1, 0, errReadonly
}
df.Lock()
defer df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(e)
if err != nil {
return -1, 0, err
}
df.offset += n
return e.Offset, n, nil
}

View File

@@ -1,205 +0,0 @@
package internal
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/oxtoacart/bpool"
"github.com/pkg/errors"
"golang.org/x/exp/mmap"
"github.com/gogo/protobuf/proto"
pb "github.com/prologic/bitcask/internal/proto"
"github.com/prologic/bitcask/internal/streampb"
)
const (
DefaultDatafileFilename = "%09d.data"
prefixSize = 8
)
var (
ErrReadonly = errors.New("error: read only datafile")
ErrReadError = errors.New("error: read error")
memPool *bpool.BufferPool
mxMemPool sync.RWMutex
)
type Datafile struct {
sync.RWMutex
id int
r *os.File
ra *mmap.ReaderAt
w *os.File
offset int64
dec *streampb.Decoder
enc *streampb.Encoder
}
func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
var (
r *os.File
ra *mmap.ReaderAt
w *os.File
err error
)
fn := filepath.Join(path, fmt.Sprintf(DefaultDatafileFilename, id))
if !readonly {
w, err = os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
if err != nil {
return nil, err
}
}
r, err = os.Open(fn)
if err != nil {
return nil, err
}
stat, err := r.Stat()
if err != nil {
return nil, errors.Wrap(err, "error calling Stat()")
}
ra, err = mmap.Open(fn)
if err != nil {
return nil, err
}
offset := stat.Size()
dec := streampb.NewDecoder(r)
enc := streampb.NewEncoder(w)
return &Datafile{
id: id,
r: r,
ra: ra,
w: w,
offset: offset,
dec: dec,
enc: enc,
}, nil
}
func (df *Datafile) FileID() int {
return df.id
}
func (df *Datafile) Name() string {
return df.r.Name()
}
func (df *Datafile) Close() error {
defer func() {
df.ra.Close()
df.r.Close()
}()
// Readonly Datafile -- Nothing further to close on the write side
if df.w == nil {
return nil
}
err := df.Sync()
if err != nil {
return err
}
return df.w.Close()
}
func (df *Datafile) Sync() error {
if df.w == nil {
return nil
}
return df.w.Sync()
}
func (df *Datafile) Size() int64 {
df.RLock()
defer df.RUnlock()
return df.offset
}
func (df *Datafile) Read() (e pb.Entry, n int64, err error) {
df.Lock()
defer df.Unlock()
n, err = df.dec.Decode(&e)
if err != nil {
return
}
return
}
func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
var n int
var b []byte
if memPool == nil {
b = make([]byte, size)
} else {
poolSlice := memPool.Get()
if poolSlice.Cap() < int(size) {
poolSlice.Grow(int(size) - poolSlice.Cap())
}
defer memPool.Put(poolSlice)
b = poolSlice.Bytes()[:size]
}
if df.w == nil {
n, err = df.ra.ReadAt(b, index)
} else {
n, err = df.r.ReadAt(b, index)
}
if err != nil {
return
}
if int64(n) != size {
err = ErrReadError
return
}
err = proto.Unmarshal(b[prefixSize:], &e)
if err != nil {
return
}
return
}
func (df *Datafile) Write(e pb.Entry) (int64, int64, error) {
if df.w == nil {
return -1, 0, ErrReadonly
}
df.Lock()
defer df.Unlock()
e.Offset = df.offset
n, err := df.enc.Encode(&e)
if err != nil {
return -1, 0, err
}
df.offset += n
return e.Offset, n, nil
}
// ConfigureMemPool configurate the mempool accordingly
func ConfigureMemPool(maxConcurrency *int) {
mxMemPool.Lock()
defer mxMemPool.Unlock()
if maxConcurrency == nil {
memPool = nil
} else {
memPool = bpool.NewBufferPool(*maxConcurrency)
}
return
}

View File

@@ -2,14 +2,21 @@ package internal
import (
"hash/crc32"
pb "github.com/prologic/bitcask/internal/proto"
)
func NewEntry(key string, value []byte) pb.Entry {
// Entry represents a key/value in the database
type Entry struct {
Checksum uint32
Key []byte
Offset int64
Value []byte
}
// NewEntry creates a new `Entry` with the given `key` and `value`
func NewEntry(key, value []byte) Entry {
checksum := crc32.ChecksumIEEE(value)
return pb.Entry{
return Entry{
Checksum: checksum,
Key: key,
Value: value,

View File

@@ -0,0 +1,138 @@
package index
import (
"encoding/binary"
"io"
"github.com/pkg/errors"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
var (
errTruncatedKeySize = errors.New("key size is truncated")
errTruncatedKeyData = errors.New("key data is truncated")
errTruncatedData = errors.New("data is truncated")
errKeySizeTooLarge = errors.New("key size too large")
)
const (
int32Size = 4
int64Size = 8
fileIDSize = int32Size
offsetSize = int64Size
sizeSize = int64Size
)
func readKeyBytes(r io.Reader, maxKeySize uint32) ([]byte, error) {
s := make([]byte, int32Size)
_, err := io.ReadFull(r, s)
if err != nil {
if err == io.EOF {
return nil, err
}
return nil, errors.Wrap(errTruncatedKeySize, err.Error())
}
size := binary.BigEndian.Uint32(s)
if size > uint32(maxKeySize) {
return nil, errKeySizeTooLarge
}
b := make([]byte, size)
_, err = io.ReadFull(r, b)
if err != nil {
return nil, errors.Wrap(errTruncatedKeyData, err.Error())
}
return b, nil
}
func writeBytes(b []byte, w io.Writer) error {
s := make([]byte, int32Size)
binary.BigEndian.PutUint32(s, uint32(len(b)))
_, err := w.Write(s)
if err != nil {
return err
}
_, err = w.Write(b)
if err != nil {
return err
}
return nil
}
func readItem(r io.Reader) (internal.Item, error) {
buf := make([]byte, (fileIDSize + offsetSize + sizeSize))
_, err := io.ReadFull(r, buf)
if err != nil {
return internal.Item{}, errors.Wrap(errTruncatedData, err.Error())
}
return internal.Item{
FileID: int(binary.BigEndian.Uint32(buf[:fileIDSize])),
Offset: int64(binary.BigEndian.Uint64(buf[fileIDSize:(fileIDSize + offsetSize)])),
Size: int64(binary.BigEndian.Uint64(buf[(fileIDSize + offsetSize):])),
}, nil
}
func writeItem(item internal.Item, w io.Writer) error {
buf := make([]byte, (fileIDSize + offsetSize + sizeSize))
binary.BigEndian.PutUint32(buf[:fileIDSize], uint32(item.FileID))
binary.BigEndian.PutUint64(buf[fileIDSize:(fileIDSize+offsetSize)], uint64(item.Offset))
binary.BigEndian.PutUint64(buf[(fileIDSize+offsetSize):], uint64(item.Size))
_, err := w.Write(buf)
if err != nil {
return err
}
return nil
}
// ReadIndex reads a persisted from a io.Reader into a Tree
func readIndex(r io.Reader, t art.Tree, maxKeySize uint32) error {
for {
key, err := readKeyBytes(r, maxKeySize)
if err != nil {
if err == io.EOF {
break
}
return err
}
item, err := readItem(r)
if err != nil {
return err
}
t.Insert(key, item)
}
return nil
}
func writeIndex(t art.Tree, w io.Writer) (err error) {
t.ForEach(func(node art.Node) bool {
err = writeBytes(node.Key(), w)
if err != nil {
return false
}
item := node.Value().(internal.Item)
err := writeItem(item, w)
if err != nil {
return false
}
return true
})
return
}
// IsIndexCorruption returns a boolean indicating whether the error
// is known to report a corruption data issue
func IsIndexCorruption(err error) bool {
cause := errors.Cause(err)
switch cause {
case errKeySizeTooLarge, errTruncatedData, errTruncatedKeyData, errTruncatedKeySize:
return true
}
return false
}

View File

@@ -0,0 +1,126 @@
package index
import (
"bytes"
"encoding/base64"
"encoding/binary"
"testing"
"github.com/pkg/errors"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
const (
base64SampleTree = "AAAABGFiY2QAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAARhYmNlAAAAAQAAAAAAAAABAAAAAAAAAAEAAAAEYWJjZgAAAAIAAAAAAAAAAgAAAAAAAAACAAAABGFiZ2QAAAADAAAAAAAAAAMAAAAAAAAAAw=="
)
func TestWriteIndex(t *testing.T) {
at, expectedSerializedSize := getSampleTree()
var b bytes.Buffer
err := writeIndex(at, &b)
if err != nil {
t.Fatalf("writing index failed: %v", err)
}
if b.Len() != expectedSerializedSize {
t.Fatalf("incorrect size of serialied index: expected %d, got: %d", expectedSerializedSize, b.Len())
}
sampleTreeBytes, _ := base64.StdEncoding.DecodeString(base64SampleTree)
if !bytes.Equal(b.Bytes(), sampleTreeBytes) {
t.Fatalf("unexpected serialization of the tree")
}
}
func TestReadIndex(t *testing.T) {
sampleTreeBytes, _ := base64.StdEncoding.DecodeString(base64SampleTree)
b := bytes.NewBuffer(sampleTreeBytes)
at := art.New()
err := readIndex(b, at, 1024)
if err != nil {
t.Fatalf("error while deserializing correct sample tree: %v", err)
}
atsample, _ := getSampleTree()
if atsample.Size() != at.Size() {
t.Fatalf("trees aren't the same size, expected %v, got %v", atsample.Size(), at.Size())
}
atsample.ForEach(func(node art.Node) bool {
_, found := at.Search(node.Key())
if !found {
t.Fatalf("expected node wasn't found: %s", node.Key())
}
return true
})
}
func TestReadCorruptedData(t *testing.T) {
sampleBytes, _ := base64.StdEncoding.DecodeString(base64SampleTree)
t.Run("truncated", func(t *testing.T) {
table := []struct {
name string
err error
data []byte
}{
{name: "key-size-first-item", err: errTruncatedKeySize, data: sampleBytes[:2]},
{name: "key-data-second-item", err: errTruncatedKeyData, data: sampleBytes[:6]},
{name: "key-size-second-item", err: errTruncatedKeySize, data: sampleBytes[:(int32Size+4+fileIDSize+offsetSize+sizeSize)+2]},
{name: "key-data-second-item", err: errTruncatedKeyData, data: sampleBytes[:(int32Size+4+fileIDSize+offsetSize+sizeSize)+6]},
{name: "data", err: errTruncatedData, data: sampleBytes[:int32Size+4+(fileIDSize+offsetSize+sizeSize-3)]},
}
for i := range table {
t.Run(table[i].name, func(t *testing.T) {
bf := bytes.NewBuffer(table[i].data)
if err := readIndex(bf, art.New(), 1024); !IsIndexCorruption(err) || errors.Cause(err) != table[i].err {
t.Fatalf("expected %v, got %v", table[i].err, err)
}
})
}
})
t.Run("overflow", func(t *testing.T) {
overflowKeySize := make([]byte, len(sampleBytes))
copy(overflowKeySize, sampleBytes)
binary.BigEndian.PutUint32(overflowKeySize, 1025)
overflowDataSize := make([]byte, len(sampleBytes))
copy(overflowDataSize, sampleBytes)
binary.BigEndian.PutUint32(overflowDataSize[int32Size+4+fileIDSize+offsetSize:], 1025)
table := []struct {
name string
err error
maxKeySize uint32
data []byte
}{
{name: "key-data-overflow", err: errKeySizeTooLarge, maxKeySize: 1024, data: overflowKeySize},
}
for i := range table {
t.Run(table[i].name, func(t *testing.T) {
bf := bytes.NewBuffer(table[i].data)
if err := readIndex(bf, art.New(), table[i].maxKeySize); !IsIndexCorruption(err) || errors.Cause(err) != table[i].err {
t.Fatalf("expected %v, got %v", table[i].err, err)
}
})
}
})
}
func getSampleTree() (art.Tree, int) {
at := art.New()
keys := [][]byte{[]byte("abcd"), []byte("abce"), []byte("abcf"), []byte("abgd")}
expectedSerializedSize := 0
for i := range keys {
at.Insert(keys[i], internal.Item{FileID: i, Offset: int64(i), Size: int64(i)})
expectedSerializedSize += int32Size + len(keys[i]) + fileIDSize + offsetSize + sizeSize
}
return at, expectedSerializedSize
}

59
internal/index/index.go Normal file
View File

@@ -0,0 +1,59 @@
package index
import (
"os"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
// Indexer is an interface for loading and saving the index (an Adaptive Radix Tree)
type Indexer interface {
Load(path string, maxkeySize uint32) (art.Tree, bool, error)
Save(t art.Tree, path string) error
}
// NewIndexer returns an instance of the default `Indexer` implemtnation
// which perists the index (an Adaptive Radix Tree) as a binary blob on file
func NewIndexer() Indexer {
return &indexer{}
}
type indexer struct{}
func (i *indexer) Load(path string, maxKeySize uint32) (art.Tree, bool, error) {
t := art.New()
if !internal.Exists(path) {
return t, false, nil
}
f, err := os.Open(path)
if err != nil {
return t, true, err
}
defer f.Close()
if err := readIndex(f, t, maxKeySize); err != nil {
return t, true, err
}
return t, true, nil
}
func (i *indexer) Save(t art.Tree, path string) error {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
defer f.Close()
if err := writeIndex(t, f); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
return f.Close()
}

10
internal/item.go Normal file
View File

@@ -0,0 +1,10 @@
package internal
// Item represents the location of the value on disk. This is used by the
// internal Adaptive Radix Tree to hold an in-memory structure mapping keys to
// locations on disk of where the value(s) can be read from.
type Item struct {
FileID int `json:"fileid"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
}

View File

@@ -1,115 +0,0 @@
package internal
import (
"bytes"
"encoding/gob"
"io"
"io/ioutil"
"os"
"sync"
)
type Item struct {
FileID int
Offset int64
Size int64
}
type Keydir struct {
sync.RWMutex
kv map[string]Item
}
func NewKeydir() *Keydir {
return &Keydir{
kv: make(map[string]Item),
}
}
func (k *Keydir) Add(key string, fileid int, offset, size int64) Item {
item := Item{
FileID: fileid,
Offset: offset,
Size: size,
}
k.Lock()
k.kv[key] = item
k.Unlock()
return item
}
func (k *Keydir) Get(key string) (Item, bool) {
k.RLock()
item, ok := k.kv[key]
k.RUnlock()
return item, ok
}
func (k *Keydir) Delete(key string) {
k.Lock()
delete(k.kv, key)
k.Unlock()
}
func (k *Keydir) Len() int {
return len(k.kv)
}
func (k *Keydir) Keys() chan string {
ch := make(chan string)
go func() {
k.RLock()
for key := range k.kv {
ch <- key
}
close(ch)
k.RUnlock()
}()
return ch
}
func (k *Keydir) Bytes() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(k.kv)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (k *Keydir) Load(fn string) error {
f, err := os.Open(fn)
if err != nil {
return err
}
defer f.Close()
dec := gob.NewDecoder(f)
if err := dec.Decode(&k.kv); err != nil {
return err
}
return nil
}
func (k *Keydir) Save(fn string) error {
data, err := k.Bytes()
if err != nil {
return err
}
return ioutil.WriteFile(fn, data, 0644)
}
func NewKeydirFromBytes(r io.Reader) (*Keydir, error) {
k := NewKeydir()
dec := gob.NewDecoder(r)
err := dec.Decode(&k.kv)
if err != nil {
return nil, err
}
return k, nil
}

158
internal/mocks/datafile.go Normal file
View File

@@ -0,0 +1,158 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import internal "github.com/prologic/bitcask/internal"
import mock "github.com/stretchr/testify/mock"
// Datafile is an autogenerated mock type for the Datafile type
type Datafile struct {
mock.Mock
}
// Close provides a mock function with given fields:
func (_m *Datafile) Close() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// FileID provides a mock function with given fields:
func (_m *Datafile) FileID() int {
ret := _m.Called()
var r0 int
if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int)
}
return r0
}
// Name provides a mock function with given fields:
func (_m *Datafile) Name() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// Read provides a mock function with given fields:
func (_m *Datafile) Read() (internal.Entry, int64, error) {
ret := _m.Called()
var r0 internal.Entry
if rf, ok := ret.Get(0).(func() internal.Entry); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(internal.Entry)
}
var r1 int64
if rf, ok := ret.Get(1).(func() int64); ok {
r1 = rf()
} else {
r1 = ret.Get(1).(int64)
}
var r2 error
if rf, ok := ret.Get(2).(func() error); ok {
r2 = rf()
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// ReadAt provides a mock function with given fields: index, size
func (_m *Datafile) ReadAt(index int64, size int64) (internal.Entry, error) {
ret := _m.Called(index, size)
var r0 internal.Entry
if rf, ok := ret.Get(0).(func(int64, int64) internal.Entry); ok {
r0 = rf(index, size)
} else {
r0 = ret.Get(0).(internal.Entry)
}
var r1 error
if rf, ok := ret.Get(1).(func(int64, int64) error); ok {
r1 = rf(index, size)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Size provides a mock function with given fields:
func (_m *Datafile) Size() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// Sync provides a mock function with given fields:
func (_m *Datafile) Sync() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Write provides a mock function with given fields: _a0
func (_m *Datafile) Write(_a0 internal.Entry) (int64, int64, error) {
ret := _m.Called(_a0)
var r0 int64
if rf, ok := ret.Get(0).(func(internal.Entry) int64); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(int64)
}
var r1 int64
if rf, ok := ret.Get(1).(func(internal.Entry) int64); ok {
r1 = rf(_a0)
} else {
r1 = ret.Get(1).(int64)
}
var r2 error
if rf, ok := ret.Get(2).(func(internal.Entry) error); ok {
r2 = rf(_a0)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}

56
internal/mocks/indexer.go Normal file
View File

@@ -0,0 +1,56 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import art "github.com/plar/go-adaptive-radix-tree"
import mock "github.com/stretchr/testify/mock"
// Indexer is an autogenerated mock type for the Indexer type
type Indexer struct {
mock.Mock
}
// Load provides a mock function with given fields: path, maxkeySize
func (_m *Indexer) Load(path string, maxkeySize uint32) (art.Tree, bool, error) {
ret := _m.Called(path, maxkeySize)
var r0 art.Tree
if rf, ok := ret.Get(0).(func(string, uint32) art.Tree); ok {
r0 = rf(path, maxkeySize)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(art.Tree)
}
}
var r1 bool
if rf, ok := ret.Get(1).(func(string, uint32) bool); ok {
r1 = rf(path, maxkeySize)
} else {
r1 = ret.Get(1).(bool)
}
var r2 error
if rf, ok := ret.Get(2).(func(string, uint32) error); ok {
r2 = rf(path, maxkeySize)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// Save provides a mock function with given fields: t, path
func (_m *Indexer) Save(t art.Tree, path string) error {
ret := _m.Called(t, path)
var r0 error
if rf, ok := ret.Get(0).(func(art.Tree, string) error); ok {
r0 = rf(t, path)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@@ -1,3 +0,0 @@
package proto
//go:generate protoc --go_out=. entry.proto

View File

@@ -1,99 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: entry.proto
package proto
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Entry struct {
Checksum uint32 `protobuf:"varint,1,opt,name=Checksum,proto3" json:"Checksum,omitempty"`
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
Offset int64 `protobuf:"varint,3,opt,name=Offset,proto3" json:"Offset,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) {
return fileDescriptor_entry_db5b99f271e6b4b6, []int{0}
}
func (m *Entry) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Entry.Unmarshal(m, b)
}
func (m *Entry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Entry.Marshal(b, m, deterministic)
}
func (dst *Entry) XXX_Merge(src proto.Message) {
xxx_messageInfo_Entry.Merge(dst, src)
}
func (m *Entry) XXX_Size() int {
return xxx_messageInfo_Entry.Size(m)
}
func (m *Entry) XXX_DiscardUnknown() {
xxx_messageInfo_Entry.DiscardUnknown(m)
}
var xxx_messageInfo_Entry proto.InternalMessageInfo
func (m *Entry) GetChecksum() uint32 {
if m != nil {
return m.Checksum
}
return 0
}
func (m *Entry) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
func (m *Entry) GetOffset() int64 {
if m != nil {
return m.Offset
}
return 0
}
func (m *Entry) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func init() {
proto.RegisterType((*Entry)(nil), "proto.Entry")
}
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_db5b99f271e6b4b6) }
var fileDescriptor_entry_db5b99f271e6b4b6 = []byte{
// 126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xc9, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x29, 0x2e, 0x0e, 0xe7, 0x8c, 0xd4, 0xe4, 0xec, 0xe2, 0xd2, 0x5c, 0x09, 0x46,
0x05, 0x46, 0x0d, 0xde, 0x20, 0x38, 0x5f, 0x48, 0x80, 0x8b, 0xd9, 0x3b, 0xb5, 0x52, 0x82, 0x49,
0x81, 0x51, 0x83, 0x33, 0x08, 0xc4, 0x14, 0x12, 0xe3, 0x62, 0xf3, 0x4f, 0x4b, 0x2b, 0x4e, 0x2d,
0x91, 0x60, 0x56, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf2, 0x84, 0x44, 0xb8, 0x58, 0xc3, 0x12, 0x73,
0x4a, 0x53, 0x25, 0x58, 0x14, 0x18, 0x35, 0x78, 0x82, 0x20, 0x9c, 0x24, 0x36, 0xb0, 0x5d, 0xc6,
0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x76, 0xd2, 0x3e, 0x83, 0x81, 0x00, 0x00, 0x00,
}

View File

@@ -1,10 +0,0 @@
syntax = "proto3";
package proto;
message Entry {
uint32 Checksum = 1;
string Key = 2;
int64 Offset = 3;
bytes Value = 4;
}

View File

@@ -1,97 +0,0 @@
package streampb
import (
"bufio"
"encoding/binary"
"io"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)
const (
// prefixSize is the number of bytes we preallocate for storing
// our big endian lenth prefix buffer.
prefixSize = 8
)
// NewEncoder creates a streaming protobuf encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// proto encodings on it.
type Encoder struct {
w *bufio.Writer
}
// Encode takes any proto.Message and streams it to the underlying writer.
// Messages are framed with a length prefix.
func (e *Encoder) Encode(msg proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize)
buf, err := proto.Marshal(msg)
if err != nil {
return 0, err
}
binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf)))
if _, err := e.w.Write(prefixBuf); err != nil {
return 0, errors.Wrap(err, "failed writing length prefix")
}
n, err := e.w.Write(buf)
if err != nil {
return 0, errors.Wrap(err, "failed writing marshaled data")
}
if err = e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(n + prefixSize), nil
}
// NewDecoder creates a streaming protobuf decoder.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: r}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// proto decodings on it.
type Decoder struct {
r io.Reader
}
// Decode takes a proto.Message and unmarshals the next payload in the
// underlying io.Reader. It returns an EOF when it's done.
func (d *Decoder) Decode(v proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return 0, err
}
n := binary.BigEndian.Uint64(prefixBuf)
buf := make([]byte, n)
idx := uint64(0)
for idx < n {
m, err := d.r.Read(buf[idx:n])
if err != nil {
return 0, errors.Wrap(translateError(err), "failed reading marshaled data")
}
idx += uint64(m)
}
return int64(idx + prefixSize), proto.Unmarshal(buf[:n], v)
}
func translateError(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
}
return err
}

View File

@@ -9,11 +9,14 @@ import (
"strings"
)
// Exists returns `true` if the given `path` on the current file system exists
func Exists(path string) bool {
_, err := os.Stat(path)
return err == nil
}
// DirSize returns the space occupied by the given `path` on disk on the current
// file system.
func DirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
@@ -28,6 +31,9 @@ func DirSize(path string) (int64, error) {
return size, err
}
// GetDatafiles returns a list of all data files stored in the database path
// given by `path`. All datafiles are identified by the the glob `*.data` and
// the basename is represented by an monotomic increasing integer.
func GetDatafiles(path string) ([]string, error) {
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
if err != nil {
@@ -37,6 +43,8 @@ func GetDatafiles(path string) ([]string, error) {
return fns, nil
}
// ParseIds will parse a list of datafiles as returned by `GetDatafiles` and
// extract the id part and return a slice of ints.
func ParseIds(fns []string) ([]int, error) {
var ids []int
for _, fn := range fns {

View File

@@ -1,115 +1,62 @@
package bitcask
import (
"encoding/json"
"errors"
"io/ioutil"
"path/filepath"
)
import "github.com/prologic/bitcask/internal/config"
const (
// DefaultMaxDatafileSize is the default maximum datafile size in bytes
DefaultMaxDatafileSize = 1 << 20 // 1MB
// DefaultMaxKeySize is the default maximum key size in bytes
DefaultMaxKeySize = 64 // 64 bytes
DefaultMaxKeySize = uint32(64) // 64 bytes
// DefaultMaxValueSize is the default value size in bytes
DefaultMaxValueSize = 1 << 16 // 65KB
)
DefaultMaxValueSize = uint64(1 << 16) // 65KB
var (
// ErrMaxConcurrencyLowerEqZero is the error returned for
// maxConcurrency option not greater than zero
ErrMaxConcurrencyLowerEqZero = errors.New("error: maxConcurrency must be greater than zero")
// DefaultSync is the default file synchronization action
DefaultSync = false
)
// Option is a function that takes a config struct and modifies it
type Option func(*config) error
type config struct {
maxDatafileSize int
maxKeySize int
maxValueSize int
maxConcurrency *int
}
func (c *config) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
}{
MaxDatafileSize: c.maxDatafileSize,
MaxKeySize: c.maxKeySize,
MaxValueSize: c.maxValueSize,
})
}
func getConfig(path string) (*config, error) {
type Config struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
}
var cfg Config
data, err := ioutil.ReadFile(filepath.Join(path, "config.json"))
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return &config{
maxDatafileSize: cfg.MaxDatafileSize,
maxKeySize: cfg.MaxKeySize,
maxValueSize: cfg.MaxValueSize,
}, nil
}
func newDefaultConfig() *config {
return &config{
maxDatafileSize: DefaultMaxDatafileSize,
maxKeySize: DefaultMaxKeySize,
maxValueSize: DefaultMaxValueSize,
}
}
type Option func(*config.Config) error
// WithMaxDatafileSize sets the maximum datafile size option
func WithMaxDatafileSize(size int) Option {
return func(cfg *config) error {
cfg.maxDatafileSize = size
return func(cfg *config.Config) error {
cfg.MaxDatafileSize = size
return nil
}
}
// WithMaxKeySize sets the maximum key size option
func WithMaxKeySize(size int) Option {
return func(cfg *config) error {
cfg.maxKeySize = size
func WithMaxKeySize(size uint32) Option {
return func(cfg *config.Config) error {
cfg.MaxKeySize = size
return nil
}
}
// WithMaxValueSize sets the maximum value size option
func WithMaxValueSize(size int) Option {
return func(cfg *config) error {
cfg.maxValueSize = size
func WithMaxValueSize(size uint64) Option {
return func(cfg *config.Config) error {
cfg.MaxValueSize = size
return nil
}
}
// WithMemPool configures usage of a memory pool to avoid allocations
func WithMemPool(maxConcurrency int) Option {
return func(cfg *config) error {
if maxConcurrency <= 0 {
return ErrMaxConcurrencyLowerEqZero
}
cfg.maxConcurrency = &maxConcurrency
// WithSync causes Sync() to be called on every key/value written increasing
// durability and safety at the expense of performance
func WithSync(sync bool) Option {
return func(cfg *config.Config) error {
cfg.Sync = sync
return nil
}
}
func newDefaultConfig() *config.Config {
return &config.Config{
MaxDatafileSize: DefaultMaxDatafileSize,
MaxKeySize: DefaultMaxKeySize,
MaxValueSize: DefaultMaxValueSize,
Sync: DefaultSync,
}
}