Compare commits

..

21 Commits

Author SHA1 Message Date
Ignacio Hagopian
0338755f8c fix readme typos (#81) 2019-09-02 10:22:08 +10:00
Ignacio Hagopian
877bf982b1 fix go vet (#80) 2019-09-02 10:20:56 +10:00
James Mills
abbbeb8e1d Replace keydir with ART trie (#75)
* Replace keydir with ART trie

* Address some review feedback

* Address review feedback (consts)
2019-09-02 08:38:56 +10:00
James Mills
36bc134b22 Fix a bug wit the decoder passing the wrong value for the value's offset into the buffer (#77) 2019-08-31 08:35:17 +10:00
James Mills
ea96b8afc0 Update README.md 2019-08-30 13:28:17 +10:00
James Mills
b3d6f734b6 Use an Adaptive Radix Tree (#71) 2019-08-30 08:13:24 +10:00
Ignacio Hagopian
55459a5c93 readme: fix stargazers link (#73) 2019-08-30 08:05:08 +10:00
James Mills
a20ee3e3d4 Update README.md 2019-08-26 08:45:15 +10:00
James Mills
cd27b84069 Create FUNDING.yml 2019-08-25 12:01:15 +10:00
James Mills
b28353de02 Update README.md 2019-08-18 22:34:14 +10:00
Awn
e8bee948bc Make optimised scan functionality optional (#68) 2019-08-16 10:51:59 +10:00
Xin Zhang
156d29e344 Fix typo (#65) 2019-08-14 20:53:08 +10:00
James Mills
c5a565cd82 Adds WithSync(...) option to turn on sync after write durability (#63)
* Added WithSync(...) option to turn  on sync after write durability

* Add Sync/NoSync benchmark variants for Put()
2019-08-12 06:47:46 +10:00
Ignacio Hagopian
8f56cffd86 codec: collapse write and save extra alloc (#64) 2019-08-12 06:47:26 +10:00
James Mills
7204a33512 Fix and cleanup some unnecessary internal sub-packages and duplication 2019-08-08 22:28:25 +10:00
James Mills
c7d101d34f Fixed missing model import for codec with undefined Entry 2019-08-08 20:14:27 +10:00
Awn
af43cfa8f1 Remove merge function (#60)
* tidy: clean up some leftovers

Fixes #56
Fixes #57
Fixes #58

* api: remove standalone merge function

Fixes #55
2019-08-08 19:51:45 +10:00
James Mills
110c5024ee Update README.md 2019-08-08 12:17:42 +10:00
Ignacio Hagopian
1f10b4026d inline hash function to save extra alloc (#53) 2019-08-08 09:52:31 +10:00
Ignacio Hagopian
fd179b4a86 custom high-performance encoder implementation (#52) 2019-08-08 09:21:46 +10:00
James Mills
755b1879b5 Use []byte byte slices as keys directly avoiding serialing string(s) (#46) (#51) 2019-08-08 08:14:48 +10:00
27 changed files with 524 additions and 622 deletions

2
.github/FUNDING.yml vendored Normal file
View File

@@ -0,0 +1,2 @@
github: prologic
patreon: prologic

View File

@@ -5,6 +5,7 @@
[![Go Report Card](https://goreportcard.com/badge/prologic/bitcask)](https://goreportcard.com/report/prologic/bitcask)
[![GoDoc](https://godoc.org/github.com/prologic/bitcask?status.svg)](https://godoc.org/github.com/prologic/bitcask)
[![GitHub license](https://img.shields.io/github/license/prologic/bitcask.svg)](https://github.com/prologic/bitcask)
[![Sourcegraph](https://sourcegraph.com/github.com/prologic/bitcask/-/badge.svg)](https://sourcegraph.com/github.com/prologic/bitcask?badge)
A high performance Key/Value store written in [Go](https://golang.org) with a predictable read/write performance and high throughput. Uses a [Bitcask](https://en.wikipedia.org/wiki/Bitcask) on-disk layout (LSM+WAL) similar to [Riak](https://riak.com/)
@@ -21,27 +22,11 @@ For a more feature-complete Redis-compatible server, distributed key/value store
## Development
1. Get the source
```#!sh
$ git clone https://github.com/prologic/bitcask.git
```
2. Install required tools
This library uses [Protobuf](https://github.com/protocolbuffers/protobuf) to serialize data on disk. Please follow the
instructions for installing `protobuf` on your system. You will also need the
following Go libraries/tools to generate Go code from Protobuf defs:
- [protoc-gen-go](https://github.com/golang/protobuf)
3. Build the project
```#!sh
$ make
```
This will invoke `go generate` and `go build`.
## Install
```#!sh
@@ -172,9 +157,21 @@ For 128B values:
The full benchmark above shows linear performance as you increase key/value sizes. Memory pooling starts to become advantageous for larger values.
## Stargazers over time
[![Stargazers over time](https://starcharts.herokuapp.com/prologic/bitcask.svg)](https://starcharts.herokuapp.com/prologic/bitcask)
## Support
Support the ongoing development of Bitcask!
**Sponser**
- Become a [Sponser](https://www.patreon.com/prologic)
## Contributors
Thank you to all those that have contributed to this project, battle-tested it, used it in their own projects or pdocuts, fixed bugs, improved performance and even fix tiny tpyos in documentation! Thank you and keep contirbuting!
Thank you to all those that have contributed to this project, battle-tested it, used it in their own projects or products, fixed bugs, improved performance and even fix tiny typos in documentation! Thank you and keep contributing!
You can find an [AUTHORS](/AUTHORS) file where we keep a list of contributors to the project. If you contriibute a PR please consider adding your name there. There is also Github's own [Contributors](https://github.com/prologic/bitcask/graphs/contributors) statistics.

View File

@@ -11,9 +11,8 @@ import (
"path/filepath"
"sync"
"github.com/derekparker/trie"
"github.com/gofrs/flock"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
@@ -36,10 +35,6 @@ var (
// ErrDatabaseLocked is the error returned if the database is locked
// (typically opened by another process)
ErrDatabaseLocked = errors.New("error: database locked")
// ErrCreatingMemPool is the error returned when trying to configurate
// the mempool fails
ErrCreatingMemPool = errors.New("error: creating the mempool failed")
)
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
@@ -54,9 +49,8 @@ type Bitcask struct {
options []Option
path string
curr *internal.Datafile
keydir *internal.Keydir
datafiles map[int]*internal.Datafile
trie *trie.Trie
trie art.Tree
}
// Stats is a struct returned by Stats() on an open Bitcask instance
@@ -77,7 +71,9 @@ func (b *Bitcask) Stats() (stats Stats, err error) {
}
stats.Datafiles = len(b.datafiles)
stats.Keys = b.keydir.Len()
b.mu.RLock()
stats.Keys = b.trie.Size()
b.mu.RUnlock()
stats.Size = size
return
@@ -92,7 +88,16 @@ func (b *Bitcask) Close() error {
os.Remove(b.Flock.Path())
}()
if err := b.keydir.Save(path.Join(b.path, "index")); err != nil {
f, err := os.OpenFile(filepath.Join(b.path, "index"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
defer f.Close()
if err := internal.WriteIndex(b.trie, f); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
@@ -112,14 +117,18 @@ func (b *Bitcask) Sync() error {
// Get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
func (b *Bitcask) Get(key string) ([]byte, error) {
func (b *Bitcask) Get(key []byte) ([]byte, error) {
var df *internal.Datafile
item, ok := b.keydir.Get(key)
if !ok {
b.mu.RLock()
value, found := b.trie.Search(key)
b.mu.RUnlock()
if !found {
return nil, ErrKeyNotFound
}
item := value.(internal.Item)
if item.FileID == b.curr.FileID() {
df = b.curr
} else {
@@ -140,13 +149,15 @@ func (b *Bitcask) Get(key string) ([]byte, error) {
}
// Has returns true if the key exists in the database, false otherwise.
func (b *Bitcask) Has(key string) bool {
_, ok := b.keydir.Get(key)
return ok
func (b *Bitcask) Has(key []byte) bool {
b.mu.RLock()
_, found := b.trie.Search(key)
b.mu.RUnlock()
return found
}
// Put stores the key and value in the database.
func (b *Bitcask) Put(key string, value []byte) error {
func (b *Bitcask) Put(key, value []byte) error {
if len(key) > b.config.maxKeySize {
return ErrKeyTooLarge
}
@@ -159,22 +170,31 @@ func (b *Bitcask) Put(key string, value []byte) error {
return err
}
item := b.keydir.Add(key, b.curr.FileID(), offset, n)
b.trie.Add(key, item)
if b.config.sync {
if err := b.curr.Sync(); err != nil {
return err
}
}
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
b.mu.Lock()
b.trie.Insert(key, item)
b.mu.Unlock()
return nil
}
// Delete deletes the named key. If the key doesn't exist or an I/O error
// occurs the error is returned.
func (b *Bitcask) Delete(key string) error {
func (b *Bitcask) Delete(key []byte) error {
_, _, err := b.put(key, []byte{})
if err != nil {
return err
}
b.keydir.Delete(key)
b.trie.Remove(key)
b.mu.Lock()
b.trie.Delete(key)
b.mu.Unlock()
return nil
}
@@ -182,39 +202,69 @@ func (b *Bitcask) Delete(key string) error {
// Scan performs a prefix scan of keys matching the given prefix and calling
// the function `f` with the keys found. If the function returns an error
// no further keys are processed and the first error returned.
func (b *Bitcask) Scan(prefix string, f func(key string) error) error {
keys := b.trie.PrefixSearch(prefix)
for _, key := range keys {
if err := f(key); err != nil {
return err
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
// Skip the root node
if len(node.Key()) == 0 {
return true
}
}
return nil
if err = f(node.Key()); err != nil {
return false
}
return true
})
return
}
// Len returns the total number of keys in the database
func (b *Bitcask) Len() int {
return b.keydir.Len()
b.mu.RLock()
defer b.mu.RUnlock()
return b.trie.Size()
}
// Keys returns all keys in the database as a channel of string(s)
func (b *Bitcask) Keys() chan string {
return b.keydir.Keys()
// Keys returns all keys in the database as a channel of keys
func (b *Bitcask) Keys() chan []byte {
ch := make(chan []byte)
go func() {
b.mu.RLock()
defer b.mu.RUnlock()
for it := b.trie.Iterator(); it.HasNext(); {
node, _ := it.Next()
// Skip the root node
if len(node.Key()) == 0 {
continue
}
ch <- node.Key()
}
close(ch)
}()
return ch
}
// Fold iterates over all keys in the database calling the function `f` for
// each key. If the function returns an error, no further keys are processed
// and the error returned.
func (b *Bitcask) Fold(f func(key string) error) error {
for key := range b.keydir.Keys() {
if err := f(key); err != nil {
return err
func (b *Bitcask) Fold(f func(key []byte) error) error {
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEach(func(node art.Node) bool {
if err := f(node.Key()); err != nil {
return false
}
}
return true
})
return nil
}
func (b *Bitcask) put(key string, value []byte) (int64, int64, error) {
func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
b.mu.Lock()
defer b.mu.Unlock()
@@ -292,16 +342,17 @@ func (b *Bitcask) reopen() error {
datafiles[id] = df
}
keydir := internal.NewKeydir()
trie := trie.New()
t := art.New()
if internal.Exists(path.Join(b.path, "index")) {
if err := keydir.Load(path.Join(b.path, "index")); err != nil {
f, err := os.Open(path.Join(b.path, "index"))
if err != nil {
return err
}
for key := range keydir.Keys() {
item, _ := keydir.Get(key)
trie.Add(key, item)
defer f.Close()
if err := internal.ReadIndex(f, t); err != nil {
return err
}
} else {
for i, df := range datafiles {
@@ -316,12 +367,12 @@ func (b *Bitcask) reopen() error {
// Tombstone value (deleted key)
if len(e.Value) == 0 {
keydir.Delete(e.Key)
t.Delete(e.Key)
continue
}
item := keydir.Add(e.Key, ids[i], e.Offset, n)
trie.Add(e.Key, item)
item := internal.Item{FileID: ids[i], Offset: e.Offset, Size: n}
t.Insert(e.Key, item)
}
}
}
@@ -336,13 +387,10 @@ func (b *Bitcask) reopen() error {
return err
}
b.trie = t
b.curr = curr
b.datafiles = datafiles
b.keydir = keydir
b.trie = trie
return nil
}
@@ -366,7 +414,7 @@ func (b *Bitcask) Merge() error {
// Rewrite all key/value pairs into merged database
// Doing this automatically strips deleted keys and
// old key/value pairs
err = b.Fold(func(key string) error {
err = b.Fold(func(key []byte) error {
value, err := b.Get(key)
if err != nil {
return err
@@ -457,8 +505,6 @@ func Open(path string, options ...Option) (*Bitcask, error) {
}
}
internal.ConfigureMemPool(bitcask.config.maxConcurrency)
locked, err := bitcask.Flock.TryLock()
if err != nil {
return nil, err
@@ -478,16 +524,3 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return bitcask, nil
}
// Merge calls Bitcask.Merge()
// XXX: Deprecated; Please use the `.Merge()` method
// XXX: This is only kept here for backwards compatibility
// it will be removed in future releases at some point
func Merge(path string, force bool) error {
db, err := Open(path)
if err != nil {
return err
}
return db.Merge()
}

View File

@@ -1,6 +1,7 @@
package bitcask
import (
"bytes"
"fmt"
"io/ioutil"
"os"
@@ -13,6 +14,32 @@ import (
"github.com/stretchr/testify/assert"
)
type sortByteArrays [][]byte
func (b sortByteArrays) Len() int {
return len(b)
}
func (b sortByteArrays) Less(i, j int) bool {
switch bytes.Compare(b[i], b[j]) {
case -1:
return true
case 0, 1:
return false
}
return false
}
func (b sortByteArrays) Swap(i, j int) {
b[j], b[i] = b[i], b[j]
}
func SortByteArrays(src [][]byte) [][]byte {
sorted := sortByteArrays(src)
sort.Sort(sorted)
return sorted
}
func TestAll(t *testing.T) {
var (
db *Bitcask
@@ -31,12 +58,12 @@ func TestAll(t *testing.T) {
})
t.Run("Put", func(t *testing.T) {
err = db.Put("foo", []byte("bar"))
err = db.Put([]byte([]byte("foo")), []byte("bar"))
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
val, err := db.Get("foo")
val, err := db.Get([]byte("foo"))
assert.NoError(err)
assert.Equal([]byte("bar"), val)
})
@@ -46,24 +73,24 @@ func TestAll(t *testing.T) {
})
t.Run("Has", func(t *testing.T) {
assert.True(db.Has("foo"))
assert.True(db.Has([]byte("foo")))
})
t.Run("Keys", func(t *testing.T) {
keys := make([]string, 0)
keys := make([][]byte, 0)
for key := range db.Keys() {
keys = append(keys, key)
}
assert.Equal([]string{"foo"}, keys)
assert.Equal([][]byte{[]byte("foo")}, keys)
})
t.Run("Fold", func(t *testing.T) {
var (
keys []string
keys [][]byte
values [][]byte
)
err := db.Fold(func(key string) error {
err := db.Fold(func(key []byte) error {
value, err := db.Get(key)
if err != nil {
return err
@@ -73,14 +100,14 @@ func TestAll(t *testing.T) {
return nil
})
assert.NoError(err)
assert.Equal([]string{"foo"}, keys)
assert.Equal([][]byte{[]byte("foo")}, keys)
assert.Equal([][]byte{[]byte("bar")}, values)
})
t.Run("Delete", func(t *testing.T) {
err := db.Delete("foo")
err := db.Delete([]byte("foo"))
assert.NoError(err)
_, err = db.Get("foo")
_, err = db.Get([]byte("foo"))
assert.Error(err)
assert.Equal(ErrKeyNotFound, err)
})
@@ -114,20 +141,20 @@ func TestDeletedKeys(t *testing.T) {
})
t.Run("Put", func(t *testing.T) {
err = db.Put("foo", []byte("bar"))
err = db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
val, err := db.Get("foo")
val, err := db.Get([]byte("foo"))
assert.NoError(err)
assert.Equal([]byte("bar"), val)
})
t.Run("Delete", func(t *testing.T) {
err := db.Delete("foo")
err := db.Delete([]byte("foo"))
assert.NoError(err)
_, err = db.Get("foo")
_, err = db.Get([]byte("foo"))
assert.Error(err)
assert.Equal(ErrKeyNotFound, err)
})
@@ -155,7 +182,7 @@ func TestDeletedKeys(t *testing.T) {
})
t.Run("Get", func(t *testing.T) {
_, err = db.Get("foo")
_, err = db.Get([]byte("foo"))
assert.Error(err)
assert.Equal(ErrKeyNotFound, err)
})
@@ -181,7 +208,7 @@ func TestMaxKeySize(t *testing.T) {
})
t.Run("Put", func(t *testing.T) {
key := strings.Repeat(" ", 17)
key := []byte(strings.Repeat(" ", 17))
value := []byte("foobar")
err = db.Put(key, value)
assert.Error(err)
@@ -203,7 +230,7 @@ func TestMaxValueSize(t *testing.T) {
})
t.Run("Put", func(t *testing.T) {
key := "foo"
key := []byte("foo")
value := []byte(strings.Repeat(" ", 17))
err = db.Put(key, value)
assert.Error(err)
@@ -229,12 +256,12 @@ func TestStats(t *testing.T) {
})
t.Run("Put", func(t *testing.T) {
err := db.Put("foo", []byte("bar"))
err := db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
val, err := db.Get("foo")
val, err := db.Get([]byte("foo"))
assert.NoError(err)
assert.Equal([]byte("bar"), val)
})
@@ -276,7 +303,7 @@ func TestMerge(t *testing.T) {
})
t.Run("Put", func(t *testing.T) {
err := db.Put("foo", []byte("bar"))
err := db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
})
@@ -287,7 +314,7 @@ func TestMerge(t *testing.T) {
t.Run("Put", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := db.Put("foo", []byte("bar"))
err := db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
}
})
@@ -340,7 +367,7 @@ func TestConcurrent(t *testing.T) {
})
t.Run("Put", func(t *testing.T) {
err = db.Put("foo", []byte("bar"))
err = db.Put([]byte("foo"), []byte("bar"))
assert.NoError(err)
})
})
@@ -353,7 +380,7 @@ func TestConcurrent(t *testing.T) {
}()
for i := 0; i <= 100; i++ {
if i%x == 0 {
key := fmt.Sprintf("k%d", i)
key := []byte(fmt.Sprintf("k%d", i))
value := []byte(fmt.Sprintf("v%d", i))
err := db.Put(key, value)
assert.NoError(err)
@@ -377,7 +404,7 @@ func TestConcurrent(t *testing.T) {
wg.Done()
}()
for i := 0; i <= N; i++ {
value, err := db.Get("foo")
value, err := db.Get([]byte("foo"))
assert.NoError(err)
assert.Equal([]byte("bar"), value)
}
@@ -420,12 +447,12 @@ func TestScan(t *testing.T) {
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte("foo"),
"foo": []byte([]byte("foo")),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err = db.Put(k, v)
err = db.Put([]byte(k), v)
assert.NoError(err)
}
})
@@ -433,21 +460,21 @@ func TestScan(t *testing.T) {
t.Run("Scan", func(t *testing.T) {
var (
vals []string
expected = []string{
"foo",
"fooz ball",
"pizza",
vals [][]byte
expected = [][]byte{
[]byte("foo"),
[]byte("fooz ball"),
[]byte("pizza"),
}
)
err = db.Scan("fo", func(key string) error {
err = db.Scan([]byte("fo"), func(key []byte) error {
val, err := db.Get(key)
assert.NoError(err)
vals = append(vals, string(val))
vals = append(vals, val)
return nil
})
sort.Strings(vals)
vals = SortByteArrays(vals)
assert.Equal(expected, vals)
})
}
@@ -468,9 +495,8 @@ func TestLocking(t *testing.T) {
}
type benchmarkTestCase struct {
name string
size int
withPool bool
name string
size int
}
func BenchmarkGet(b *testing.B) {
@@ -486,40 +512,28 @@ func BenchmarkGet(b *testing.B) {
defer os.RemoveAll(testdir)
tests := []benchmarkTestCase{
{"128B", 128, false},
{"128BWithPool", 128, true},
{"256B", 256, false},
{"256BWithPool", 256, true},
{"512B", 512, false},
{"512BWithPool", 512, true},
{"1K", 1024, false},
{"1KWithPool", 1024, true},
{"2K", 2048, false},
{"2KWithPool", 2048, true},
{"4K", 4096, false},
{"4KWithPool", 4096, true},
{"8K", 8192, false},
{"8KWithPool", 8192, true},
{"16K", 16384, false},
{"16KWithPool", 16384, true},
{"32K", 32768, false},
{"32KWithPool", 32768, true},
{"128B", 128},
{"256B", 256},
{"512B", 512},
{"1K", 1024},
{"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
b.SetBytes(int64(tt.size))
key := "foo"
key := []byte("foo")
value := []byte(strings.Repeat(" ", tt.size))
options := []Option{
WithMaxKeySize(len(key)),
WithMaxValueSize(tt.size),
}
if tt.withPool {
options = append(options, WithMemPool(1))
}
db, err := Open(testdir, options...)
if err != nil {
b.Fatal(err)
@@ -536,7 +550,7 @@ func BenchmarkGet(b *testing.B) {
if err != nil {
b.Fatal(err)
}
if string(val) != string(value) {
if !bytes.Equal(val, value) {
b.Errorf("unexpected value")
}
}
@@ -552,44 +566,54 @@ func BenchmarkPut(b *testing.B) {
b.Fatal(err)
}
testdir, err := ioutil.TempDir(currentDir, "bitcask_bench")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(testdir)
db, err := Open(testdir)
if err != nil {
b.Fatal(err)
}
defer db.Close()
tests := []benchmarkTestCase{
{"128B", 128, false},
{"256B", 256, false},
{"512B", 512, false},
{"1K", 1024, false},
{"2K", 2048, false},
{"4K", 4096, false},
{"8K", 8192, false},
{"16K", 16384, false},
{"32K", 32768, false},
{"128B", 128},
{"256B", 256},
{"1K", 1024},
{"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
b.SetBytes(int64(tt.size))
variants := map[string][]Option{
"NoSync": []Option{
WithSync(false),
},
"Sync": []Option{
WithSync(true),
},
}
key := "foo"
value := []byte(strings.Repeat(" ", tt.size))
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := db.Put(key, value)
if err != nil {
b.Fatal(err)
for name, options := range variants {
testdir, err := ioutil.TempDir(currentDir, "bitcask_bench")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(testdir)
db, err := Open(testdir, options...)
if err != nil {
b.Fatal(err)
}
defer db.Close()
for _, tt := range tests {
b.Run(tt.name+name, func(b *testing.B) {
b.SetBytes(int64(tt.size))
key := []byte("foo")
value := []byte(strings.Repeat(" ", tt.size))
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := db.Put(key, value)
if err != nil {
b.Fatal(err)
}
}
}
})
})
}
}
}
@@ -616,30 +640,30 @@ func BenchmarkScan(b *testing.B) {
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte("foo"),
"foo": []byte([]byte("foo")),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err := db.Put(k, v)
err := db.Put([]byte(k), v)
if err != nil {
b.Fatal(err)
}
}
var expected = []string{"foo", "food", "fooz"}
var expected = [][]byte{[]byte("foo"), []byte("food"), []byte("fooz")}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var keys []string
err = db.Scan("fo", func(key string) error {
var keys [][]byte
err = db.Scan([]byte("fo"), func(key []byte) error {
keys = append(keys, key)
return nil
})
if err != nil {
b.Fatal(err)
}
sort.Strings(keys)
keys = SortByteArrays(keys)
if !reflect.DeepEqual(expected, keys) {
b.Fatal(fmt.Errorf("expected keys=#%v got=%#v", expected, keys))
}

View File

@@ -37,7 +37,7 @@ func del(path, key string) int {
}
defer db.Close()
err = db.Delete(key)
err = db.Delete([]byte(key))
if err != nil {
log.WithError(err).Error("error deleting key")
return 1

View File

@@ -90,7 +90,7 @@ func export(path, output string) int {
}
}
err = db.Fold(func(key string) error {
err = db.Fold(func(key []byte) error {
value, err := db.Get(key)
if err != nil {
log.WithError(err).

View File

@@ -38,7 +38,7 @@ func get(path, key string) int {
}
defer db.Close()
value, err := db.Get(key)
value, err := db.Get([]byte(key))
if err != nil {
log.WithError(err).Error("error reading key")
return 1

View File

@@ -89,7 +89,7 @@ func _import(path, input string) int {
return 2
}
if err := db.Put(string(key), value); err != nil {
if err := db.Put(key, value); err != nil {
log.WithError(err).Error("error writing key/value")
return 2
}

View File

@@ -36,8 +36,8 @@ func keys(path string) int {
}
defer db.Close()
err = db.Fold(func(key string) error {
fmt.Printf("%s\n", key)
err = db.Fold(func(key []byte) error {
fmt.Printf("%s\n", string(key))
return nil
})
if err != nil {

View File

@@ -55,7 +55,7 @@ func put(path, key string, value io.Reader) int {
return 1
}
err = db.Put(key, data)
err = db.Put([]byte(key), data)
if err != nil {
log.WithError(err).Error("error writing key")
return 1

View File

@@ -14,7 +14,7 @@ import (
var scanCmd = &cobra.Command{
Use: "scan <prefix>",
Aliases: []string{"search", "find"},
Short: "Perform a prefis scan for keys",
Short: "Perform a prefix scan for keys",
Long: `This performa a prefix scan for keys starting with the given
prefix. This uses a Trie to search for matching keys and returns all matched
keys.`,
@@ -40,7 +40,7 @@ func scan(path, prefix string) int {
}
defer db.Close()
err = db.Scan(prefix, func(key string) error {
err = db.Scan([]byte(prefix), func(key []byte) error {
value, err := db.Get(key)
if err != nil {
log.WithError(err).Error("error reading key")

View File

@@ -76,7 +76,7 @@ func main() {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
key := cmd.Args[1]
value := cmd.Args[2]
err = db.Put(key, value)
if err != nil {
@@ -89,7 +89,7 @@ func main() {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
key := cmd.Args[1]
value, err := db.Get(key)
if err != nil {
conn.WriteNull()
@@ -106,7 +106,7 @@ func main() {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
key := cmd.Args[1]
if db.Has(key) {
conn.WriteInt(1)
} else {
@@ -117,7 +117,7 @@ func main() {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
key := string(cmd.Args[1])
key := cmd.Args[1]
err := db.Delete(key)
if err != nil {
conn.WriteInt(0)

10
doc.go
View File

@@ -1,13 +1,3 @@
// Package bitcask implements a high-performance key-value store based on a
// WAL and LSM.
//
// By default, the client assumes a default configuration regarding maximum key size,
// maximum value size, maximum datafile size, and memory pools to avoid allocations.
// Refer to Constants section to know default values.
//
// For extra performance, configure the memory pool option properly. This option
// requires to specify the maximum number of concurrent use of the package. Failing to
// set a high-enough value would impact latency and throughput. Likewise, overestimating
// would yield in an unnecessary big memory footprint.
// The default configuration doesn't use a memory pool.
package bitcask

View File

@@ -8,7 +8,6 @@ func Example_withOptions() {
opts := []Option{
WithMaxKeySize(1024),
WithMaxValueSize(4096),
WithMemPool(10),
}
_, _ = Open("path/to/db", opts...)
}

11
go.mod
View File

@@ -1,15 +1,14 @@
module github.com/prologic/bitcask
go 1.12
require (
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d
github.com/gofrs/flock v0.7.1
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.2
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/pelletier/go-toml v1.4.0 // indirect
github.com/pkg/errors v0.8.1
github.com/plar/go-adaptive-radix-tree v1.0.1
github.com/sirupsen/logrus v1.4.2
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cobra v0.0.5
@@ -18,7 +17,7 @@ require (
github.com/spf13/viper v1.4.0
github.com/stretchr/testify v1.3.0
github.com/tidwall/redcon v1.0.0
golang.org/x/exp v0.0.0-20190718202018-cfdd5522f6f6
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa // indirect
golang.org/x/text v0.3.2 // indirect
)

25
go.sum
View File

@@ -17,12 +17,9 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d h1:TocZO8frNoxkwqFPePHFldSw8vLu+gBrlvFZYWqxiF4=
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d/go.mod h1:D6ICZm05D9VN1n/8iOtBxLpXtoGp6HDFUJ1RNVieOSE=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
@@ -35,16 +32,12 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
@@ -59,7 +52,6 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -79,8 +71,6 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg=
@@ -88,6 +78,8 @@ github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUr
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/plar/go-adaptive-radix-tree v1.0.1 h1:J+2qrXaKWLACw59s8SlTVYYxWjlUr/BlCsfkAzn96/0=
github.com/plar/go-adaptive-radix-tree v1.0.1/go.mod h1:Ot8d28EII3i7Lv4PSvBlF8ejiD/CtRYDuPsySJbSaK8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@@ -120,7 +112,6 @@ github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmq
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
@@ -141,12 +132,11 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20190718202018-cfdd5522f6f6 h1:1xlTfQXUyQvvbx4TFsXRgoj9r0YUwcqq9XGX9u9OODU=
golang.org/x/exp v0.0.0-20190718202018-cfdd5522f6f6/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4=
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56 h1:estk1glOnSVeJ9tdEZZc5mAMDZk5lNJNyJ6DvrBkTEU=
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@@ -166,13 +156,13 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
@@ -188,7 +178,6 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

105
internal/codec.go Normal file
View File

@@ -0,0 +1,105 @@
package internal
import (
"bufio"
"encoding/binary"
"io"
"github.com/pkg/errors"
)
const (
KeySize = 4
ValueSize = 8
checksumSize = 4
)
// NewEncoder creates a streaming Entry encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// Entry encodings on it.
type Encoder struct {
w *bufio.Writer
}
// Encode takes any Entry and streams it to the underlying writer.
// Messages are framed with a key-length and value-length prefix.
func (e *Encoder) Encode(msg Entry) (int64, error) {
var bufKeyValue = make([]byte, KeySize+ValueSize)
binary.BigEndian.PutUint32(bufKeyValue[:KeySize], uint32(len(msg.Key)))
binary.BigEndian.PutUint64(bufKeyValue[KeySize:KeySize+ValueSize], uint64(len(msg.Value)))
if _, err := e.w.Write(bufKeyValue); err != nil {
return 0, errors.Wrap(err, "failed writing key & value length prefix")
}
if _, err := e.w.Write(msg.Key); err != nil {
return 0, errors.Wrap(err, "failed writing key data")
}
if _, err := e.w.Write(msg.Value); err != nil {
return 0, errors.Wrap(err, "failed writing value data")
}
bufChecksumSize := bufKeyValue[:checksumSize]
binary.BigEndian.PutUint32(bufChecksumSize, msg.Checksum)
if _, err := e.w.Write(bufChecksumSize); err != nil {
return 0, errors.Wrap(err, "failed writing checksum data")
}
if err := e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(KeySize + ValueSize + len(msg.Key) + len(msg.Value) + checksumSize), nil
}
// NewDecoder creates a streaming Entry decoder.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: r}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// Entry decodings on it.
type Decoder struct {
r io.Reader
}
func (d *Decoder) Decode(v *Entry) (int64, error) {
prefixBuf := make([]byte, KeySize+ValueSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return 0, err
}
actualKeySize, actualValueSize := GetKeyValueSizes(prefixBuf)
buf := make([]byte, actualKeySize+actualValueSize+checksumSize)
if _, err = io.ReadFull(d.r, buf); err != nil {
return 0, errors.Wrap(translateError(err), "failed reading saved data")
}
DecodeWithoutPrefix(buf, actualKeySize, v)
return int64(KeySize + ValueSize + actualKeySize + actualValueSize + checksumSize), nil
}
func GetKeyValueSizes(buf []byte) (uint64, uint64) {
actualKeySize := binary.BigEndian.Uint32(buf[:KeySize])
actualValueSize := binary.BigEndian.Uint64(buf[KeySize:])
return uint64(actualKeySize), actualValueSize
}
func DecodeWithoutPrefix(buf []byte, valueOffset uint64, v *Entry) {
v.Key = buf[:valueOffset]
v.Value = buf[valueOffset : len(buf)-checksumSize]
v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:])
}
func translateError(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
}
return err
}

110
internal/codec_index.go Normal file
View File

@@ -0,0 +1,110 @@
package internal
import (
"encoding/binary"
"io"
art "github.com/plar/go-adaptive-radix-tree"
)
const (
Int32Size = 4
Int64Size = 8
FileIDSize = Int32Size
OffsetSize = Int64Size
SizeSize = Int64Size
)
func ReadBytes(r io.Reader) ([]byte, error) {
s := make([]byte, Int32Size)
_, err := io.ReadFull(r, s)
if err != nil {
return nil, err
}
size := binary.BigEndian.Uint32(s)
b := make([]byte, size)
_, err = io.ReadFull(r, b)
if err != nil {
return nil, err
}
return b, nil
}
func WriteBytes(b []byte, w io.Writer) (int, error) {
s := make([]byte, Int32Size)
binary.BigEndian.PutUint32(s, uint32(len(b)))
n, err := w.Write(s)
if err != nil {
return n, err
}
m, err := w.Write(b)
if err != nil {
return (n + m), err
}
return (n + m), nil
}
func ReadItem(r io.Reader) (Item, error) {
buf := make([]byte, (FileIDSize + OffsetSize + SizeSize))
_, err := io.ReadFull(r, buf)
if err != nil {
return Item{}, err
}
return Item{
FileID: int(binary.BigEndian.Uint32(buf[:FileIDSize])),
Offset: int64(binary.BigEndian.Uint64(buf[FileIDSize:(FileIDSize + OffsetSize)])),
Size: int64(binary.BigEndian.Uint64(buf[(FileIDSize + OffsetSize):])),
}, nil
}
func WriteItem(item Item, w io.Writer) (int, error) {
buf := make([]byte, (FileIDSize + OffsetSize + SizeSize))
binary.BigEndian.PutUint32(buf[:FileIDSize], uint32(item.FileID))
binary.BigEndian.PutUint64(buf[FileIDSize:(FileIDSize+OffsetSize)], uint64(item.Offset))
binary.BigEndian.PutUint64(buf[(FileIDSize+OffsetSize):], uint64(item.Size))
n, err := w.Write(buf)
if err != nil {
return 0, err
}
return n, nil
}
func ReadIndex(r io.Reader, t art.Tree) error {
for {
key, err := ReadBytes(r)
if err != nil {
if err == io.EOF {
break
}
return err
}
item, err := ReadItem(r)
if err != nil {
return err
}
t.Insert(key, item)
}
return nil
}
func WriteIndex(t art.Tree, w io.Writer) (err error) {
t.ForEach(func(node art.Node) bool {
_, err = WriteBytes(node.Key(), w)
if err != nil {
return false
}
item := node.Value().(Item)
_, err := WriteItem(item, w)
if err != nil {
return false
}
return true
})
return
}

View File

@@ -6,25 +6,18 @@ import (
"path/filepath"
"sync"
"github.com/oxtoacart/bpool"
"github.com/pkg/errors"
"golang.org/x/exp/mmap"
"github.com/gogo/protobuf/proto"
pb "github.com/prologic/bitcask/internal/proto"
"github.com/prologic/bitcask/internal/streampb"
)
const (
DefaultDatafileFilename = "%09d.data"
prefixSize = 8
)
var (
ErrReadonly = errors.New("error: read only datafile")
ErrReadError = errors.New("error: read error")
memPool *bpool.BufferPool
mxMemPool sync.RWMutex
)
@@ -36,8 +29,8 @@ type Datafile struct {
ra *mmap.ReaderAt
w *os.File
offset int64
dec *streampb.Decoder
enc *streampb.Encoder
dec *Decoder
enc *Encoder
}
func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
@@ -73,8 +66,8 @@ func NewDatafile(path string, id int, readonly bool) (*Datafile, error) {
offset := stat.Size()
dec := streampb.NewDecoder(r)
enc := streampb.NewEncoder(w)
dec := NewDecoder(r)
enc := NewEncoder(w)
return &Datafile{
id: id,
@@ -126,7 +119,7 @@ func (df *Datafile) Size() int64 {
return df.offset
}
func (df *Datafile) Read() (e pb.Entry, n int64, err error) {
func (df *Datafile) Read() (e Entry, n int64, err error) {
df.Lock()
defer df.Unlock()
@@ -138,20 +131,10 @@ func (df *Datafile) Read() (e pb.Entry, n int64, err error) {
return
}
func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
func (df *Datafile) ReadAt(index, size int64) (e Entry, err error) {
var n int
var b []byte
if memPool == nil {
b = make([]byte, size)
} else {
poolSlice := memPool.Get()
if poolSlice.Cap() < int(size) {
poolSlice.Grow(int(size) - poolSlice.Cap())
}
defer memPool.Put(poolSlice)
b = poolSlice.Bytes()[:size]
}
b := make([]byte, size)
if df.w == nil {
n, err = df.ra.ReadAt(b, index)
@@ -166,14 +149,13 @@ func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
return
}
err = proto.Unmarshal(b[prefixSize:], &e)
if err != nil {
return
}
valueOffset, _ := GetKeyValueSizes(b)
DecodeWithoutPrefix(b[KeySize+ValueSize:], valueOffset, &e)
return
}
func (df *Datafile) Write(e pb.Entry) (int64, int64, error) {
func (df *Datafile) Write(e Entry) (int64, int64, error) {
if df.w == nil {
return -1, 0, ErrReadonly
}
@@ -183,7 +165,7 @@ func (df *Datafile) Write(e pb.Entry) (int64, int64, error) {
e.Offset = df.offset
n, err := df.enc.Encode(&e)
n, err := df.enc.Encode(e)
if err != nil {
return -1, 0, err
}
@@ -191,15 +173,3 @@ func (df *Datafile) Write(e pb.Entry) (int64, int64, error) {
return e.Offset, n, nil
}
// ConfigureMemPool configurate the mempool accordingly
func ConfigureMemPool(maxConcurrency *int) {
mxMemPool.Lock()
defer mxMemPool.Unlock()
if maxConcurrency == nil {
memPool = nil
} else {
memPool = bpool.NewBufferPool(*maxConcurrency)
}
return
}

View File

@@ -2,14 +2,20 @@ package internal
import (
"hash/crc32"
pb "github.com/prologic/bitcask/internal/proto"
)
func NewEntry(key string, value []byte) pb.Entry {
// Entry represents a key/value in the database
type Entry struct {
Checksum uint32
Key []byte
Offset int64
Value []byte
}
func NewEntry(key, value []byte) Entry {
checksum := crc32.ChecksumIEEE(value)
return pb.Entry{
return Entry{
Checksum: checksum,
Key: key,
Value: value,

7
internal/item.go Normal file
View File

@@ -0,0 +1,7 @@
package internal
type Item struct {
FileID int `json:"fileid"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
}

View File

@@ -1,115 +0,0 @@
package internal
import (
"bytes"
"encoding/gob"
"io"
"io/ioutil"
"os"
"sync"
)
type Item struct {
FileID int
Offset int64
Size int64
}
type Keydir struct {
sync.RWMutex
kv map[string]Item
}
func NewKeydir() *Keydir {
return &Keydir{
kv: make(map[string]Item),
}
}
func (k *Keydir) Add(key string, fileid int, offset, size int64) Item {
item := Item{
FileID: fileid,
Offset: offset,
Size: size,
}
k.Lock()
k.kv[key] = item
k.Unlock()
return item
}
func (k *Keydir) Get(key string) (Item, bool) {
k.RLock()
item, ok := k.kv[key]
k.RUnlock()
return item, ok
}
func (k *Keydir) Delete(key string) {
k.Lock()
delete(k.kv, key)
k.Unlock()
}
func (k *Keydir) Len() int {
return len(k.kv)
}
func (k *Keydir) Keys() chan string {
ch := make(chan string)
go func() {
k.RLock()
for key := range k.kv {
ch <- key
}
close(ch)
k.RUnlock()
}()
return ch
}
func (k *Keydir) Bytes() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(k.kv)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (k *Keydir) Load(fn string) error {
f, err := os.Open(fn)
if err != nil {
return err
}
defer f.Close()
dec := gob.NewDecoder(f)
if err := dec.Decode(&k.kv); err != nil {
return err
}
return nil
}
func (k *Keydir) Save(fn string) error {
data, err := k.Bytes()
if err != nil {
return err
}
return ioutil.WriteFile(fn, data, 0644)
}
func NewKeydirFromBytes(r io.Reader) (*Keydir, error) {
k := NewKeydir()
dec := gob.NewDecoder(r)
err := dec.Decode(&k.kv)
if err != nil {
return nil, err
}
return k, nil
}

View File

@@ -1,3 +0,0 @@
package proto
//go:generate protoc --go_out=. entry.proto

View File

@@ -1,99 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: entry.proto
package proto
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Entry struct {
Checksum uint32 `protobuf:"varint,1,opt,name=Checksum,proto3" json:"Checksum,omitempty"`
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
Offset int64 `protobuf:"varint,3,opt,name=Offset,proto3" json:"Offset,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Entry) Reset() { *m = Entry{} }
func (m *Entry) String() string { return proto.CompactTextString(m) }
func (*Entry) ProtoMessage() {}
func (*Entry) Descriptor() ([]byte, []int) {
return fileDescriptor_entry_db5b99f271e6b4b6, []int{0}
}
func (m *Entry) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Entry.Unmarshal(m, b)
}
func (m *Entry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Entry.Marshal(b, m, deterministic)
}
func (dst *Entry) XXX_Merge(src proto.Message) {
xxx_messageInfo_Entry.Merge(dst, src)
}
func (m *Entry) XXX_Size() int {
return xxx_messageInfo_Entry.Size(m)
}
func (m *Entry) XXX_DiscardUnknown() {
xxx_messageInfo_Entry.DiscardUnknown(m)
}
var xxx_messageInfo_Entry proto.InternalMessageInfo
func (m *Entry) GetChecksum() uint32 {
if m != nil {
return m.Checksum
}
return 0
}
func (m *Entry) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
func (m *Entry) GetOffset() int64 {
if m != nil {
return m.Offset
}
return 0
}
func (m *Entry) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func init() {
proto.RegisterType((*Entry)(nil), "proto.Entry")
}
func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_db5b99f271e6b4b6) }
var fileDescriptor_entry_db5b99f271e6b4b6 = []byte{
// 126 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xc9, 0x5c, 0xac, 0xae,
0x20, 0x51, 0x21, 0x29, 0x2e, 0x0e, 0xe7, 0x8c, 0xd4, 0xe4, 0xec, 0xe2, 0xd2, 0x5c, 0x09, 0x46,
0x05, 0x46, 0x0d, 0xde, 0x20, 0x38, 0x5f, 0x48, 0x80, 0x8b, 0xd9, 0x3b, 0xb5, 0x52, 0x82, 0x49,
0x81, 0x51, 0x83, 0x33, 0x08, 0xc4, 0x14, 0x12, 0xe3, 0x62, 0xf3, 0x4f, 0x4b, 0x2b, 0x4e, 0x2d,
0x91, 0x60, 0x56, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf2, 0x84, 0x44, 0xb8, 0x58, 0xc3, 0x12, 0x73,
0x4a, 0x53, 0x25, 0x58, 0x14, 0x18, 0x35, 0x78, 0x82, 0x20, 0x9c, 0x24, 0x36, 0xb0, 0x5d, 0xc6,
0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x76, 0xd2, 0x3e, 0x83, 0x81, 0x00, 0x00, 0x00,
}

View File

@@ -1,10 +0,0 @@
syntax = "proto3";
package proto;
message Entry {
uint32 Checksum = 1;
string Key = 2;
int64 Offset = 3;
bytes Value = 4;
}

View File

@@ -1,97 +0,0 @@
package streampb
import (
"bufio"
"encoding/binary"
"io"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)
const (
// prefixSize is the number of bytes we preallocate for storing
// our big endian lenth prefix buffer.
prefixSize = 8
)
// NewEncoder creates a streaming protobuf encoder.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: bufio.NewWriter(w)}
}
// Encoder wraps an underlying io.Writer and allows you to stream
// proto encodings on it.
type Encoder struct {
w *bufio.Writer
}
// Encode takes any proto.Message and streams it to the underlying writer.
// Messages are framed with a length prefix.
func (e *Encoder) Encode(msg proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize)
buf, err := proto.Marshal(msg)
if err != nil {
return 0, err
}
binary.BigEndian.PutUint64(prefixBuf, uint64(len(buf)))
if _, err := e.w.Write(prefixBuf); err != nil {
return 0, errors.Wrap(err, "failed writing length prefix")
}
n, err := e.w.Write(buf)
if err != nil {
return 0, errors.Wrap(err, "failed writing marshaled data")
}
if err = e.w.Flush(); err != nil {
return 0, errors.Wrap(err, "failed flushing data")
}
return int64(n + prefixSize), nil
}
// NewDecoder creates a streaming protobuf decoder.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: r}
}
// Decoder wraps an underlying io.Reader and allows you to stream
// proto decodings on it.
type Decoder struct {
r io.Reader
}
// Decode takes a proto.Message and unmarshals the next payload in the
// underlying io.Reader. It returns an EOF when it's done.
func (d *Decoder) Decode(v proto.Message) (int64, error) {
prefixBuf := make([]byte, prefixSize)
_, err := io.ReadFull(d.r, prefixBuf)
if err != nil {
return 0, err
}
n := binary.BigEndian.Uint64(prefixBuf)
buf := make([]byte, n)
idx := uint64(0)
for idx < n {
m, err := d.r.Read(buf[idx:n])
if err != nil {
return 0, errors.Wrap(translateError(err), "failed reading marshaled data")
}
idx += uint64(m)
}
return int64(idx + prefixSize), proto.Unmarshal(buf[:n], v)
}
func translateError(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
}
return err
}

View File

@@ -2,7 +2,6 @@ package bitcask
import (
"encoding/json"
"errors"
"io/ioutil"
"path/filepath"
)
@@ -18,12 +17,6 @@ const (
DefaultMaxValueSize = 1 << 16 // 65KB
)
var (
// ErrMaxConcurrencyLowerEqZero is the error returned for
// maxConcurrency option not greater than zero
ErrMaxConcurrencyLowerEqZero = errors.New("error: maxConcurrency must be greater than zero")
)
// Option is a function that takes a config struct and modifies it
type Option func(*config) error
@@ -31,26 +24,29 @@ type config struct {
maxDatafileSize int
maxKeySize int
maxValueSize int
maxConcurrency *int
sync bool
}
func (c *config) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
Sync bool `json:"sync"`
}{
MaxDatafileSize: c.maxDatafileSize,
MaxKeySize: c.maxKeySize,
MaxValueSize: c.maxValueSize,
Sync: c.sync,
})
}
func getConfig(path string) (*config, error) {
type Config struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
Sync bool `json:"sync"`
}
var cfg Config
@@ -68,6 +64,7 @@ func getConfig(path string) (*config, error) {
maxDatafileSize: cfg.MaxDatafileSize,
maxKeySize: cfg.MaxKeySize,
maxValueSize: cfg.MaxValueSize,
sync: cfg.Sync,
}, nil
}
@@ -103,13 +100,11 @@ func WithMaxValueSize(size int) Option {
}
}
// WithMemPool configures usage of a memory pool to avoid allocations
func WithMemPool(maxConcurrency int) Option {
// WithSync causes Sync() to be called on every key/value written increasing
// durability and safety at the expense of performance
func WithSync(sync bool) Option {
return func(cfg *config) error {
if maxConcurrency <= 0 {
return ErrMaxConcurrencyLowerEqZero
}
cfg.maxConcurrency = &maxConcurrency
cfg.sync = sync
return nil
}
}