Compare commits

...

21 Commits

Author SHA1 Message Date
James Mills
d0c913ccee Revert "Use []byte byte slices as keys directly avoiding serialing string(s) (#46)" (#50)
This reverts commit 3c1808cad3.
2019-08-08 08:06:38 +10:00
James Mills
6b372d8334 Added export/import sub-commands to backup/resotre a database (#48) 2019-08-08 08:00:29 +10:00
James Mills
3c1808cad3 Use []byte byte slices as keys directly avoiding serialing string(s) (#46) 2019-08-08 07:59:11 +10:00
James Mills
5d1dd6657a Fixed handling of missing config.json from cli behavior 2019-08-07 21:47:51 +10:00
James Mills
1ba9ca46e3 Rename set command to put and cleanup the command's docs 2019-08-07 21:44:33 +10:00
James Mills
2a419c46d2 Update README.md 2019-08-07 13:24:38 +10:00
James Mills
e543fc38fb Added AUTHORS file to record contributors beyond the scope of Github metadata (#41) 2019-08-07 13:21:09 +10:00
James Mills
82e26449fa Added the same functional options to the bitcask CLI and persist options to the db store (#40) 2019-08-07 10:23:10 +10:00
James Mills
bce2721be4 Update README.md 2019-08-06 08:15:03 +10:00
Ignacio Hagopian
f2b5515e03 update trie dependency to take advantage of improvements (#45) 2019-08-06 08:05:41 +10:00
James Mills
8b684b635d Update CONTRIBUTING.md 2019-08-05 19:48:32 +10:00
Ignacio Hagopian
a407905ae2 Improve Get/Put performance with optional mempooling (#36)
* avoid unnecessary use of encoder/decoder to decrease memory allocations

* add an optional configurable mempool to avoid extra allocs

* add doc.go with examples
2019-08-05 07:23:07 +10:00
James Mills
6ceeccfd64 Update README.md 2019-08-03 19:49:15 +10:00
James Mills
35dc7e70d2 Update README.md 2019-08-03 19:47:23 +10:00
James Mills
6cc1154611 Update README.md 2019-08-03 19:46:18 +10:00
Ignacio Hagopian
8aa66c66da keydir: avoid defers (#34) 2019-08-01 19:18:05 +10:00
Ignacio Hagopian
e3242c8426 README: typos (#35) 2019-08-01 13:48:36 +10:00
James Mills
912371645d Fixed an off-by-one bug with managing datafiles (#31) 2019-07-29 23:49:37 +10:00
James Mills
bc782a3083 Update README.md 2019-07-27 07:57:38 +10:00
James Mills
a2161179ef Update README.md 2019-07-27 07:56:34 +10:00
James Mills
51bac21c0a Improves Merge() operation by also pruning old key/value pairs (#29)
* Added new API Stats() and Prune()

* Improved Merge() logic to also prune old key/values and actually reclaim disk space

* Added backward compat for the old Merge() function

* Refactor indexing of keys to items (hints)

* Remove redundant TestOpenMerge

* Add unit test for Stats()

* Improve TestMerge()
2019-07-27 07:52:25 +10:00
19 changed files with 937 additions and 338 deletions

13
AUTHORS Normal file
View File

@@ -0,0 +1,13 @@
# Entries should be added alphabetically in the form:
# Name or Organization <email address>
# The email address is not required for organizations.
Awn Umar <awn@spacetime.dev>
Christian Muehlhaeuser <muesli@gmail.com>
Ignacio Hagopian <jsign.uy@gmail.com>
James Mills <prologic@shortcircuit.net.au>
Jesse Donat <donatj@gmail.com>
Kebert Xela kebertxela
panyun panyun
Whemoon Jang <palindrom615@gmail.com>
Yury Fedorov orlangure

View File

@@ -1,8 +1,12 @@
# Contributing
No preference. If you know hot to use Github and contributed to open source projects before then:
No preference. If you know how to use Github and have contributed to open source projects before then:
* File an issue
* Submit a pull request
* File an issue + Submit a pull request
* Use this project somewhere :)
Be sure to add yourself to the [AUTHORS](/AUTHORS) file when you submit your PR(s). Every contribution counts no how big or small!
Thanks for using Bitcask!

View File

@@ -4,14 +4,9 @@
[![CodeCov](https://codecov.io/gh/prologic/bitcask/branch/master/graph/badge.svg)](https://codecov.io/gh/prologic/bitcask)
[![Go Report Card](https://goreportcard.com/badge/prologic/bitcask)](https://goreportcard.com/report/prologic/bitcask)
[![GoDoc](https://godoc.org/github.com/prologic/bitcask?status.svg)](https://godoc.org/github.com/prologic/bitcask)
[![Sourcegraph](https://sourcegraph.com/github.com/prologic/bitcask/-/badge.svg)](https://sourcegraph.com/github.com/prologic/bitcask?badge)
[![Github all releases](https://img.shields.io/github/downloads/prologic/bitcask/total.svg)](https://github.com/prologic/bitcask/releases)
[![GitHub license](https://img.shields.io/github/license/prologic/bitcask.svg)](https://github.com/prologic/bitcask)
[![](https://images.microbadger.com/badges/version/prologic/bitcask.svg)](https://microbadger.com/images/prologic/bitcask)
[![](https://images.microbadger.com/badges/image/prologic/bitcask.svg)](https://microbadger.com/images/prologic/bitcask)
A high performance Key/Value store written in [Go](https://golang.org) with a predictable read/write performance and high throughput. Uses a [Bitcask](https://en.wikipedia.org/wiki/Bitcask) on-disk layout (LSM+WAL) similar to [Riak](https://riak.com/). 🗃️
A high performance Key/Value store written in [Go](https://golang.org) with a predictable read/write performance and high throughput. Uses a [Bitcask](https://en.wikipedia.org/wiki/Bitcask) on-disk layout (LSM+WAL) similar to [Riak](https://riak.com/)
For a more feature-complete Redis-compatible server, distributed key/value store have a look at [Bitraft](https://github.com/prologic/bitraft) which uses this library as its backend. Use [Bitcask](https://github.com/prologic/bitcask) as a starting point or if you want to embed in your application, use [Bitraft](https://github.com/prologic/bitraft) if you need a complete server/client solution with high availability with a Redis-compatible API.
@@ -21,14 +16,14 @@ For a more feature-complete Redis-compatible server, distributed key/value store
* Builtin CLI (`bitcask`)
* Builtin Redis-compatible server (`bitcaskd`)
* Predictable read/write performance
* Low latecny
* Low latency
* High throughput (See: [Performance](README.md#Performance) )
## Development
1. Get the source
```#!bash
```#!sh
$ git clone https://github.com/prologic/bitcask.git
```
@@ -37,20 +32,19 @@ $ git clone https://github.com/prologic/bitcask.git
This library uses [Protobuf](https://github.com/protocolbuffers/protobuf) to serialize data on disk. Please follow the
instructions for installing `protobuf` on your system. You will also need the
following Go libraries/tools to generate Go code from Protobuf defs:
- [protoc-gen-go](https://github.com/golang/protobuf)
3. Build the project
```#!bash
```#!sh
$ make
```
This will invoke `go generate` and `go build`.
- [protoc-gen-go](https://github.com/golang/protobuf)
## Install
```#!bash
```#!sh
$ go get github.com/prologic/bitcask
```
@@ -58,7 +52,7 @@ $ go get github.com/prologic/bitcask
Install the package into your project:
```#!bash
```#!sh
$ go get github.com/prologic/bitcask
```
@@ -80,7 +74,7 @@ documentation and other examples.
## Usage (tool)
```#!bash
```#!sh
$ bitcask -p /tmp/db set Hello World
$ bitcask -p /tmp/db get Hello
World
@@ -90,14 +84,14 @@ World
There is also a builtin very simple Redis-compatible server called `bitcaskd`:
```#!bash
```#!sh
$ ./bitcaskd ./tmp
INFO[0000] starting bitcaskd v0.0.7@146f777 bind=":6379" path=./tmp
```
Example session:
```
```#!sh
$ telnet localhost 6379
Trying ::1...
Connected to localhost.
@@ -122,7 +116,7 @@ Connection closed by foreign host.
You can also use the [Bitcask Docker Image](https://cloud.docker.com/u/prologic/repository/docker/prologic/bitcask):
```#!bash
```#!sh
$ docker pull prologic/bitcask
$ docker run -d -p 6379:6379 prologic/bitcask
```
@@ -131,39 +125,59 @@ $ docker run -d -p 6379:6379 prologic/bitcask
Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7:
```
```#!sh
$ make bench
...
BenchmarkGet/128B-4 500000 2537 ns/op 672 B/op 7 allocs/op
BenchmarkGet/256B-4 500000 2629 ns/op 1056 B/op 7 allocs/op
BenchmarkGet/512B-4 500000 2773 ns/op 1888 B/op 7 allocs/op
BenchmarkGet/1K-4 500000 3202 ns/op 3552 B/op 7 allocs/op
BenchmarkGet/2K-4 300000 3904 ns/op 6880 B/op 7 allocs/op
BenchmarkGet/4K-4 300000 5678 ns/op 14048 B/op 7 allocs/op
BenchmarkGet/8K-4 200000 8948 ns/op 27360 B/op 7 allocs/op
BenchmarkGet/16K-4 100000 14635 ns/op 53472 B/op 7 allocs/op
BenchmarkGet/32K-4 50000 28292 ns/op 114912 B/op 7 allocs/op
goos: darwin
goarch: amd64
pkg: github.com/prologic/bitcask
BenchmarkPut/128B-4 200000 8173 ns/op 409 B/op 6 allocs/op
BenchmarkPut/256B-4 200000 8404 ns/op 538 B/op 6 allocs/op
BenchmarkPut/512B-4 200000 9741 ns/op 829 B/op 6 allocs/op
BenchmarkPut/1K-4 100000 13118 ns/op 1411 B/op 6 allocs/op
BenchmarkPut/2K-4 100000 17982 ns/op 2573 B/op 6 allocs/op
BenchmarkPut/4K-4 50000 35477 ns/op 5154 B/op 6 allocs/op
BenchmarkPut/8K-4 30000 54021 ns/op 9804 B/op 6 allocs/op
BenchmarkPut/16K-4 20000 96551 ns/op 18849 B/op 6 allocs/op
BenchmarkPut/32K-4 10000 129957 ns/op 41561 B/op 7 allocs/op
BenchmarkGet/128B-4 300000 3913 ns/op 32.71 MB/s 387 B/op 4 allocs/op
BenchmarkGet/128BWithPool-4 300000 4143 ns/op 30.89 MB/s 227 B/op 3 allocs/op
BenchmarkGet/256B-4 300000 3919 ns/op 65.31 MB/s 643 B/op 4 allocs/op
BenchmarkGet/256BWithPool-4 300000 4270 ns/op 59.95 MB/s 355 B/op 3 allocs/op
BenchmarkGet/512B-4 300000 4248 ns/op 120.52 MB/s 1187 B/op 4 allocs/op
BenchmarkGet/512BWithPool-4 300000 4676 ns/op 109.48 MB/s 611 B/op 3 allocs/op
BenchmarkGet/1K-4 200000 5248 ns/op 195.10 MB/s 2275 B/op 4 allocs/op
BenchmarkGet/1KWithPool-4 200000 5270 ns/op 194.28 MB/s 1123 B/op 3 allocs/op
BenchmarkGet/2K-4 200000 6229 ns/op 328.74 MB/s 4451 B/op 4 allocs/op
BenchmarkGet/2KWithPool-4 200000 6282 ns/op 325.99 MB/s 2147 B/op 3 allocs/op
BenchmarkGet/4K-4 200000 9027 ns/op 453.74 MB/s 9059 B/op 4 allocs/op
BenchmarkGet/4KWithPool-4 200000 8906 ns/op 459.87 MB/s 4195 B/op 3 allocs/op
BenchmarkGet/8K-4 100000 12024 ns/op 681.28 MB/s 17763 B/op 4 allocs/op
BenchmarkGet/8KWithPool-4 200000 11103 ns/op 737.79 MB/s 8291 B/op 3 allocs/op
BenchmarkGet/16K-4 100000 16844 ns/op 972.65 MB/s 34915 B/op 4 allocs/op
BenchmarkGet/16KWithPool-4 100000 14575 ns/op 1124.10 MB/s 16483 B/op 3 allocs/op
BenchmarkGet/32K-4 50000 27770 ns/op 1179.97 MB/s 73827 B/op 4 allocs/op
BenchmarkGet/32KWithPool-4 100000 24495 ns/op 1337.74 MB/s 32867 B/op 3 allocs/op
BenchmarkScan-4 1000000 2011 ns/op 493 B/op 25 allocs/op
BenchmarkPut/128B-4 100000 17492 ns/op 7.32 MB/s 441 B/op 6 allocs/op
BenchmarkPut/256B-4 100000 17234 ns/op 14.85 MB/s 571 B/op 6 allocs/op
BenchmarkPut/512B-4 100000 22837 ns/op 22.42 MB/s 861 B/op 6 allocs/op
BenchmarkPut/1K-4 50000 30333 ns/op 33.76 MB/s 1443 B/op 6 allocs/op
BenchmarkPut/2K-4 30000 45304 ns/op 45.21 MB/s 2606 B/op 6 allocs/op
BenchmarkPut/4K-4 20000 83953 ns/op 48.79 MB/s 5187 B/op 6 allocs/op
BenchmarkPut/8K-4 10000 142142 ns/op 57.63 MB/s 9845 B/op 6 allocs/op
BenchmarkPut/16K-4 5000 206722 ns/op 79.26 MB/s 18884 B/op 6 allocs/op
BenchmarkPut/32K-4 5000 361108 ns/op 90.74 MB/s 41582 B/op 7 allocs/op
BenchmarkScan-4 1000000 1679 ns/op 408 B/op 16 allocs/op
PASS
```
For 128B values:
* ~400,000 reads/sec
* ~130,000 writes/sec
* ~200,000 reads/sec
* ~50,000 writes/sec
The full benchmark above shows linear performance as you increase key/value sizes.
The full benchmark above shows linear performance as you increase key/value sizes. Memory pooling starts to become advantageous for larger values.
## Contributors
Thank you to all those that have contributed to this project, battle-tested it, used it in their own projects or pdocuts, fixed bugs, improved performance and even fix tiny tpyos in documentation! Thank you and keep contirbuting!
You can find an [AUTHORS](/AUTHORS) file where we keep a list of contributors to the project. If you contriibute a PR please consider adding your name there. There is also Github's own [Contributors](https://github.com/prologic/bitcask/graphs/contributors) statistics.
## License
bitcask is licensed under the [MIT License](https://github.com/prologic/bitcask/blob/master/LICENSE)
bitcask is licensed under the term of the [MIT License](https://github.com/prologic/bitcask/blob/master/LICENSE)

View File

@@ -1,17 +1,18 @@
package bitcask
import (
"encoding/json"
"errors"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"sync"
"github.com/derekparker/trie"
"github.com/gofrs/flock"
"github.com/prologic/trie"
"github.com/prologic/bitcask/internal"
)
@@ -35,6 +36,10 @@ var (
// ErrDatabaseLocked is the error returned if the database is locked
// (typically opened by another process)
ErrDatabaseLocked = errors.New("error: database locked")
// ErrCreatingMemPool is the error returned when trying to configurate
// the mempool fails
ErrCreatingMemPool = errors.New("error: creating the mempool failed")
)
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
@@ -46,13 +51,38 @@ type Bitcask struct {
*flock.Flock
config *config
options []Option
path string
curr *internal.Datafile
keydir *internal.Keydir
datafiles []*internal.Datafile
datafiles map[int]*internal.Datafile
trie *trie.Trie
}
// Stats is a struct returned by Stats() on an open Bitcask instance
type Stats struct {
Datafiles int
Keys int
Size int64
}
// Stats returns statistics about the database including the number of
// data files, keys and overall size on disk of the data
func (b *Bitcask) Stats() (stats Stats, err error) {
var size int64
size, err = internal.DirSize(b.path)
if err != nil {
return
}
stats.Datafiles = len(b.datafiles)
stats.Keys = b.keydir.Len()
stats.Size = size
return
}
// Close closes the database and removes the lock. It is important to call
// Close() as this is the only way to cleanup the lock held by the open
// database.
@@ -62,9 +92,16 @@ func (b *Bitcask) Close() error {
os.Remove(b.Flock.Path())
}()
for _, df := range b.datafiles {
df.Close()
if err := b.keydir.Save(path.Join(b.path, "index")); err != nil {
return err
}
for _, df := range b.datafiles {
if err := df.Close(); err != nil {
return err
}
}
return b.curr.Close()
}
@@ -188,14 +225,16 @@ func (b *Bitcask) put(key string, value []byte) (int64, int64, error) {
return -1, 0, err
}
df, err := internal.NewDatafile(b.path, b.curr.FileID(), true)
id := b.curr.FileID()
df, err := internal.NewDatafile(b.path, id, true)
if err != nil {
return -1, 0, err
}
b.datafiles = append(b.datafiles, df)
b.datafiles[id] = df
id := b.curr.FileID() + 1
id = b.curr.FileID() + 1
curr, err := internal.NewDatafile(b.path, id, false)
if err != nil {
return -1, 0, err
@@ -207,176 +246,72 @@ func (b *Bitcask) put(key string, value []byte) (int64, int64, error) {
return b.curr.Write(e)
}
// Merge merges all datafiles in the database creating hint files for faster
// startup. Old keys are squashed and deleted keys removes. Call this function
// periodically to reclaim disk space.
func Merge(path string, force bool) error {
fns, err := internal.GetDatafiles(path)
if err != nil {
return err
}
ids, err := internal.ParseIds(fns)
if err != nil {
return err
}
// Do not merge if we only have 1 Datafile
if len(ids) <= 1 {
return nil
}
// Don't merge the Active Datafile (the last one)
fns = fns[:len(fns)-1]
ids = ids[:len(ids)-1]
temp, err := ioutil.TempDir(path, "merge")
if err != nil {
return err
}
defer os.RemoveAll(temp)
for i, fn := range fns {
// Don't merge Datafiles whose .hint files we've already generated
// (they are already merged); unless we set the force flag to true
// (forcing a re-merge).
if filepath.Ext(fn) == ".hint" && !force {
// Already merged
continue
}
id := ids[i]
keydir := internal.NewKeydir()
df, err := internal.NewDatafile(path, id, true)
if err != nil {
return err
}
defer df.Close()
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
keydir.Delete(e.Key)
continue
}
keydir.Add(e.Key, ids[i], e.Offset, n)
}
tempdf, err := internal.NewDatafile(temp, id, false)
if err != nil {
return err
}
defer tempdf.Close()
for key := range keydir.Keys() {
item, _ := keydir.Get(key)
e, err := df.ReadAt(item.Offset, item.Size)
if err != nil {
return err
}
_, _, err = tempdf.Write(e)
if err != nil {
return err
}
}
err = tempdf.Close()
func (b *Bitcask) readConfig() error {
if internal.Exists(filepath.Join(b.path, "config.json")) {
data, err := ioutil.ReadFile(filepath.Join(b.path, "config.json"))
if err != nil {
return err
}
err = df.Close()
if err != nil {
return err
}
err = os.Rename(tempdf.Name(), df.Name())
if err != nil {
return err
}
hint := strings.TrimSuffix(df.Name(), ".data") + ".hint"
err = keydir.Save(hint)
if err != nil {
if err := json.Unmarshal(data, &b.config); err != nil {
return err
}
}
return nil
}
// Open opens the database at the given path with optional options.
// Options can be provided with the `WithXXX` functions that provide
// configuration options as functions.
func Open(path string, options ...Option) (*Bitcask, error) {
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
err := Merge(path, false)
func (b *Bitcask) writeConfig() error {
data, err := json.Marshal(b.config)
if err != nil {
return nil, err
return err
}
return ioutil.WriteFile(filepath.Join(b.path, "config.json"), data, 0600)
}
fns, err := internal.GetDatafiles(path)
func (b *Bitcask) reopen() error {
b.mu.Lock()
defer b.mu.Unlock()
fns, err := internal.GetDatafiles(b.path)
if err != nil {
return nil, err
return err
}
ids, err := internal.ParseIds(fns)
if err != nil {
return nil, err
return err
}
var datafiles []*internal.Datafile
datafiles := make(map[int]*internal.Datafile, len(ids))
for _, id := range ids {
df, err := internal.NewDatafile(b.path, id, true)
if err != nil {
return err
}
datafiles[id] = df
}
keydir := internal.NewKeydir()
trie := trie.New()
for i, fn := range fns {
df, err := internal.NewDatafile(path, ids[i], true)
if err != nil {
return nil, err
if internal.Exists(path.Join(b.path, "index")) {
if err := keydir.Load(path.Join(b.path, "index")); err != nil {
return err
}
datafiles = append(datafiles, df)
if filepath.Ext(fn) == ".hint" {
f, err := os.Open(filepath.Join(path, fn))
if err != nil {
return nil, err
}
defer f.Close()
hint, err := internal.NewKeydirFromBytes(f)
if err != nil {
return nil, err
}
for key := range hint.Keys() {
item, _ := hint.Get(key)
_ = keydir.Add(key, item.FileID, item.Offset, item.Size)
trie.Add(key, item)
}
} else {
for key := range keydir.Keys() {
item, _ := keydir.Get(key)
trie.Add(key, item)
}
} else {
for i, df := range datafiles {
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return nil, err
return err
}
// Tombstone value (deleted key)
@@ -396,28 +331,134 @@ func Open(path string, options ...Option) (*Bitcask, error) {
id = ids[(len(ids) - 1)]
}
curr, err := internal.NewDatafile(path, id, false)
curr, err := internal.NewDatafile(b.path, id, false)
if err != nil {
return err
}
b.curr = curr
b.datafiles = datafiles
b.keydir = keydir
b.trie = trie
return nil
}
// Merge merges all datafiles in the database. Old keys are squashed
// and deleted keys removes. Duplicate key/value pairs are also removed.
// Call this function periodically to reclaim disk space.
func (b *Bitcask) Merge() error {
// Temporary merged database path
temp, err := ioutil.TempDir(b.path, "merge")
if err != nil {
return err
}
defer os.RemoveAll(temp)
// Create a merged database
mdb, err := Open(temp, b.options...)
if err != nil {
return err
}
// Rewrite all key/value pairs into merged database
// Doing this automatically strips deleted keys and
// old key/value pairs
err = b.Fold(func(key string) error {
value, err := b.Get(key)
if err != nil {
return err
}
if err := mdb.Put(key, value); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = mdb.Close()
if err != nil {
return err
}
// Close the database
err = b.Close()
if err != nil {
return err
}
// Remove all data files
files, err := ioutil.ReadDir(b.path)
if err != nil {
return err
}
for _, file := range files {
if !file.IsDir() {
err := os.RemoveAll(path.Join([]string{b.path, file.Name()}...))
if err != nil {
return err
}
}
}
// Rename all merged data files
files, err = ioutil.ReadDir(mdb.path)
if err != nil {
return err
}
for _, file := range files {
err := os.Rename(
path.Join([]string{mdb.path, file.Name()}...),
path.Join([]string{b.path, file.Name()}...),
)
if err != nil {
return err
}
}
// And finally reopen the database
return b.reopen()
}
// Open opens the database at the given path with optional options.
// Options can be provided with the `WithXXX` functions that provide
// configuration options as functions.
func Open(path string, options ...Option) (*Bitcask, error) {
var (
cfg *config
err error
)
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
cfg, err = getConfig(path)
if err != nil {
cfg = newDefaultConfig()
}
bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, "lock")),
config: newDefaultConfig(),
path: path,
curr: curr,
keydir: keydir,
datafiles: datafiles,
trie: trie,
Flock: flock.New(filepath.Join(path, "lock")),
config: cfg,
options: options,
path: path,
}
for _, opt := range options {
err = opt(bitcask.config)
if err != nil {
if err := opt(bitcask.config); err != nil {
return nil, err
}
}
internal.ConfigureMemPool(bitcask.config.maxConcurrency)
locked, err := bitcask.Flock.TryLock()
if err != nil {
return nil, err
@@ -427,5 +468,26 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return nil, ErrDatabaseLocked
}
if err := bitcask.writeConfig(); err != nil {
return nil, err
}
if err := bitcask.reopen(); err != nil {
return nil, err
}
return bitcask, nil
}
// Merge calls Bitcask.Merge()
// XXX: Deprecated; Please use the `.Merge()` method
// XXX: This is only kept here for backwards compatibility
// it will be removed in future releases at some point
func Merge(path string, force bool) error {
db, err := Open(path)
if err != nil {
return err
}
return db.Merge()
}

View File

@@ -211,38 +211,39 @@ func TestMaxValueSize(t *testing.T) {
})
}
func TestOpenMerge(t *testing.T) {
func TestStats(t *testing.T) {
var (
db *Bitcask
err error
)
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
t.Run("Setup", func(t *testing.T) {
var (
db *Bitcask
err error
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxDatafileSize(32))
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
for i := 0; i < 1024; i++ {
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
assert.NoError(err)
}
err := db.Put("foo", []byte("bar"))
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
for i := 0; i < 32; i++ {
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
assert.NoError(err)
val, err := db.Get(string(i))
assert.NoError(err)
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
}
val, err := db.Get("foo")
assert.NoError(err)
assert.Equal([]byte("bar"), val)
})
t.Run("Stats", func(t *testing.T) {
stats, err := db.Stats()
assert.NoError(err)
assert.Equal(stats.Datafiles, 0)
assert.Equal(stats.Keys, 1)
})
t.Run("Sync", func(t *testing.T) {
@@ -255,34 +256,9 @@ func TestOpenMerge(t *testing.T) {
assert.NoError(err)
})
})
t.Run("Merge", func(t *testing.T) {
var (
db *Bitcask
err error
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
for i := 0; i < 32; i++ {
val, err := db.Get(string(i))
assert.NoError(err)
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
}
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
}
func TestMergeOpen(t *testing.T) {
func TestMerge(t *testing.T) {
var (
db *Bitcask
err error
@@ -300,22 +276,40 @@ func TestMergeOpen(t *testing.T) {
})
t.Run("Put", func(t *testing.T) {
for i := 0; i < 1024; i++ {
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
err := db.Put("foo", []byte("bar"))
assert.NoError(err)
})
s1, err := db.Stats()
assert.NoError(err)
assert.Equal(0, s1.Datafiles)
assert.Equal(1, s1.Keys)
t.Run("Put", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := db.Put("foo", []byte("bar"))
assert.NoError(err)
}
})
t.Run("Get", func(t *testing.T) {
for i := 0; i < 32; i++ {
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
assert.NoError(err)
val, err := db.Get(string(i))
assert.NoError(err)
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
}
s2, err := db.Stats()
assert.NoError(err)
assert.Equal(5, s2.Datafiles)
assert.Equal(1, s2.Keys)
assert.True(s2.Size > s1.Size)
t.Run("Merge", func(t *testing.T) {
err := db.Merge()
assert.NoError(err)
})
s3, err := db.Stats()
assert.NoError(err)
assert.Equal(1, s3.Datafiles)
assert.Equal(1, s3.Keys)
assert.True(s3.Size > s1.Size)
assert.True(s3.Size < s2.Size)
t.Run("Sync", func(t *testing.T) {
err = db.Sync()
assert.NoError(err)
@@ -326,31 +320,6 @@ func TestMergeOpen(t *testing.T) {
assert.NoError(err)
})
})
t.Run("Merge", func(t *testing.T) {
t.Run("Merge", func(t *testing.T) {
err = Merge(testdir, true)
assert.NoError(err)
})
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
for i := 0; i < 32; i++ {
val, err := db.Get(string(i))
assert.NoError(err)
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
}
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
}
func TestConcurrent(t *testing.T) {
@@ -499,8 +468,9 @@ func TestLocking(t *testing.T) {
}
type benchmarkTestCase struct {
name string
size int
name string
size int
withPool bool
}
func BenchmarkGet(b *testing.B) {
@@ -515,22 +485,25 @@ func BenchmarkGet(b *testing.B) {
}
defer os.RemoveAll(testdir)
db, err := Open(testdir)
if err != nil {
b.Fatal(err)
}
defer db.Close()
tests := []benchmarkTestCase{
{"128B", 128},
{"256B", 256},
{"512B", 512},
{"1K", 1024},
{"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
{"128B", 128, false},
{"128BWithPool", 128, true},
{"256B", 256, false},
{"256BWithPool", 256, true},
{"512B", 512, false},
{"512BWithPool", 512, true},
{"1K", 1024, false},
{"1KWithPool", 1024, true},
{"2K", 2048, false},
{"2KWithPool", 2048, true},
{"4K", 4096, false},
{"4KWithPool", 4096, true},
{"8K", 8192, false},
{"8KWithPool", 8192, true},
{"16K", 16384, false},
{"16KWithPool", 16384, true},
{"32K", 32768, false},
{"32KWithPool", 32768, true},
}
for _, tt := range tests {
@@ -540,6 +513,18 @@ func BenchmarkGet(b *testing.B) {
key := "foo"
value := []byte(strings.Repeat(" ", tt.size))
options := []Option{
WithMaxKeySize(len(key)),
WithMaxValueSize(tt.size),
}
if tt.withPool {
options = append(options, WithMemPool(1))
}
db, err := Open(testdir, options...)
if err != nil {
b.Fatal(err)
}
err = db.Put(key, value)
if err != nil {
b.Fatal(err)
@@ -555,6 +540,8 @@ func BenchmarkGet(b *testing.B) {
b.Errorf("unexpected value")
}
}
b.StopTimer()
db.Close()
})
}
}
@@ -578,15 +565,15 @@ func BenchmarkPut(b *testing.B) {
defer db.Close()
tests := []benchmarkTestCase{
{"128B", 128},
{"256B", 256},
{"512B", 512},
{"1K", 1024},
{"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
{"128B", 128, false},
{"256B", 256, false},
{"512B", 512, false},
{"1K", 1024, false},
{"2K", 2048, false},
{"4K", 4096, false},
{"8K", 8192, false},
{"16K", 16384, false},
{"32K", 32768, false},
}
for _, tt := range tests {

142
cmd/bitcask/export.go Normal file
View File

@@ -0,0 +1,142 @@
package main
import (
"encoding/base64"
"encoding/json"
"errors"
"io"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/prologic/bitcask"
)
var errNotAllDataWritten = errors.New("error: not all data written")
var exportCmd = &cobra.Command{
Use: "export",
Aliases: []string{"backup", "dump"},
Short: "Export a database",
Long: `This command allows you to export or dump/backup a database's
key/values into a long-term portable archival format suitable for backup and
restore purposes or migrating from older on-disk formats of Bitcask.
All key/value pairs are base64 encoded and serialized as JSON one pair per
line to form an output stream to either standard output or a file. You can
optionally compress the output with standard compression tools such as gzip.`,
Args: cobra.RangeArgs(0, 1),
Run: func(cmd *cobra.Command, args []string) {
var output string
path := viper.GetString("path")
if len(args) == 1 {
output = args[0]
} else {
output = "-"
}
os.Exit(export(path, output))
},
}
func init() {
RootCmd.AddCommand(exportCmd)
exportCmd.PersistentFlags().IntP(
"with-max-datafile-size", "", bitcask.DefaultMaxDatafileSize,
"Maximum size of each datafile",
)
exportCmd.PersistentFlags().IntP(
"with-max-key-size", "", bitcask.DefaultMaxKeySize,
"Maximum size of each key",
)
exportCmd.PersistentFlags().IntP(
"with-max-value-size", "", bitcask.DefaultMaxValueSize,
"Maximum size of each value",
)
}
type kvPair struct {
Key string `json:"key"`
Value string `json:"value"`
}
func export(path, output string) int {
var (
err error
w io.WriteCloser
)
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
if output == "-" {
w = os.Stdout
} else {
w, err = os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_TRUNC, 0755)
if err != nil {
log.WithError(err).
WithField("output", output).
Error("error opening output for writing")
return 1
}
}
err = db.Fold(func(key string) error {
value, err := db.Get(key)
if err != nil {
log.WithError(err).
WithField("key", key).
Error("error reading key")
return err
}
kv := kvPair{
Key: base64.StdEncoding.EncodeToString([]byte(key)),
Value: base64.StdEncoding.EncodeToString(value),
}
data, err := json.Marshal(&kv)
if err != nil {
log.WithError(err).
WithField("key", key).
Error("error serialzing key")
return err
}
if n, err := w.Write(data); err != nil || n != len(data) {
if err == nil && n != len(data) {
err = errNotAllDataWritten
}
log.WithError(err).
WithField("key", key).
WithField("n", n).
Error("error writing key")
return err
}
if _, err := w.Write([]byte("\n")); err != nil {
log.WithError(err).Error("error writing newline")
return err
}
return nil
})
if err != nil {
log.WithError(err).
WithField("path", path).
WithField("output", output).
Error("error exporting keys")
return 2
}
return 0
}

106
cmd/bitcask/import.go Normal file
View File

@@ -0,0 +1,106 @@
package main
import (
"bufio"
"encoding/base64"
"encoding/json"
"io"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/prologic/bitcask"
)
var importCmd = &cobra.Command{
Use: "import",
Aliases: []string{"restore", "read"},
Short: "Import a database",
Long: `This command allows you to import or restore a database from a
previous export/dump using the export command either creating a new database
or adding additional key/value pairs to an existing one.`,
Args: cobra.RangeArgs(0, 1),
Run: func(cmd *cobra.Command, args []string) {
var input string
path := viper.GetString("path")
if len(args) == 1 {
input = args[0]
} else {
input = "-"
}
os.Exit(_import(path, input))
},
}
func init() {
RootCmd.AddCommand(importCmd)
}
func _import(path, input string) int {
var (
err error
r io.ReadCloser
)
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
if input == "-" {
r = os.Stdin
} else {
r, err = os.Open(input)
if err != nil {
log.WithError(err).
WithField("input", input).
Error("error opening input for reading")
return 1
}
}
var kv kvPair
scanner := bufio.NewScanner(r)
for scanner.Scan() {
if err := json.Unmarshal(scanner.Bytes(), &kv); err != nil {
log.WithError(err).
WithField("input", input).
Error("error reading input")
return 2
}
key, err := base64.StdEncoding.DecodeString(kv.Key)
if err != nil {
log.WithError(err).Error("error decoding key")
return 2
}
value, err := base64.StdEncoding.DecodeString(kv.Value)
if err != nil {
log.WithError(err).Error("error decoding value")
return 2
}
if err := db.Put(string(key), value); err != nil {
log.WithError(err).Error("error writing key/value")
return 2
}
}
if err := scanner.Err(); err != nil {
log.WithError(err).
WithField("input", input).
Error("error reading input")
return 2
}
return 0
}

67
cmd/bitcask/initdb.go Normal file
View File

@@ -0,0 +1,67 @@
package main
import (
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/prologic/bitcask"
)
var initdbCmd = &cobra.Command{
Use: "initdb",
Aliases: []string{"create", "init"},
Short: "Initialize a new database",
Long: `This initializes a new database with persisted options`,
Args: cobra.ExactArgs(0),
PreRun: func(cmd *cobra.Command, args []string) {
viper.BindPFlag("with-max-datafile-size", cmd.Flags().Lookup("with-max-datafile-size"))
viper.SetDefault("with-max-datafile-size", bitcask.DefaultMaxDatafileSize)
viper.BindPFlag("with-max-key-size", cmd.Flags().Lookup("with-max-key-size"))
viper.SetDefault("with-max-key-size", bitcask.DefaultMaxKeySize)
viper.BindPFlag("with-max-value-size", cmd.Flags().Lookup("with-max-value-size"))
viper.SetDefault("with-max-value-size", bitcask.DefaultMaxValueSize)
},
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
maxDatafileSize := viper.GetInt("with-max-datafile-size")
maxKeySize := viper.GetInt("with-max-key-size")
maxValueSize := viper.GetInt("with-max-value-size")
db, err := bitcask.Open(
path,
bitcask.WithMaxDatafileSize(maxDatafileSize),
bitcask.WithMaxKeySize(maxKeySize),
bitcask.WithMaxValueSize(maxValueSize),
)
if err != nil {
log.WithError(err).Error("error opening database")
os.Exit(1)
}
defer db.Close()
os.Exit(0)
},
}
func init() {
RootCmd.AddCommand(initdbCmd)
initdbCmd.PersistentFlags().IntP(
"with-max-datafile-size", "", bitcask.DefaultMaxDatafileSize,
"Maximum size of each datafile",
)
initdbCmd.PersistentFlags().IntP(
"with-max-key-size", "", bitcask.DefaultMaxKeySize,
"Maximum size of each key",
)
initdbCmd.PersistentFlags().IntP(
"with-max-value-size", "", bitcask.DefaultMaxValueSize,
"Maximum size of each value",
)
}

View File

@@ -20,28 +20,23 @@ keys.`,
Args: cobra.ExactArgs(0),
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
force, err := cmd.Flags().GetBool("force")
if err != nil {
log.WithError(err).Error("error parsing force flag")
os.Exit(1)
}
os.Exit(merge(path, force))
os.Exit(merge(path))
},
}
func init() {
RootCmd.AddCommand(mergeCmd)
mergeCmd.Flags().BoolP(
"force", "f", false,
"Force a re-merge even if .hint files exist",
)
}
func merge(path string, force bool) int {
err := bitcask.Merge(path, force)
func merge(path string) int {
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
return 1
}
if err = db.Merge(); err != nil {
log.WithError(err).Error("error merging database")
return 1
}

View File

@@ -13,11 +13,11 @@ import (
"github.com/prologic/bitcask"
)
var setCmd = &cobra.Command{
Use: "set <key> [<value>]",
Aliases: []string{"add"},
Short: "Add/Set a new Key/Value pair",
Long: `This adds or sets a new key/value pair.
var putCmd = &cobra.Command{
Use: "put <key> [<value>]",
Aliases: []string{"add", "set", "store"},
Short: "Adds a new Key/Value pair",
Long: `This adds a new key/value pair or modifies an existing one.
If the value is not specified as an argument it is read from standard input.`,
Args: cobra.MinimumNArgs(1),
@@ -33,15 +33,15 @@ If the value is not specified as an argument it is read from standard input.`,
value = os.Stdin
}
os.Exit(set(path, key, value))
os.Exit(put(path, key, value))
},
}
func init() {
RootCmd.AddCommand(setCmd)
RootCmd.AddCommand(putCmd)
}
func set(path, key string, value io.Reader) int {
func put(path, key string, value io.Reader) int {
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")

55
cmd/bitcask/stats.go Normal file
View File

@@ -0,0 +1,55 @@
package main
import (
"encoding/json"
"fmt"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/prologic/bitcask"
)
var statsCmd = &cobra.Command{
Use: "stats",
Aliases: []string{},
Short: "Display statis about the Database",
Long: `This displays statistics about the Database"`,
Args: cobra.ExactArgs(0),
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
os.Exit(stats(path))
},
}
func init() {
RootCmd.AddCommand(statsCmd)
}
func stats(path string) int {
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
stats, err := db.Stats()
if err != nil {
log.WithError(err).Error("error getting stats")
return 1
}
data, err := json.MarshalIndent(stats, "", " ")
if err != nil {
log.WithError(err).Error("error marshalling stats")
return 1
}
fmt.Println(string(data))
return 0
}

13
doc.go Normal file
View File

@@ -0,0 +1,13 @@
// Package bitcask implements a high-performance key-value store based on a
// WAL and LSM.
//
// By default, the client assumes a default configuration regarding maximum key size,
// maximum value size, maximum datafile size, and memory pools to avoid allocations.
// Refer to Constants section to know default values.
//
// For extra performance, configure the memory pool option properly. This option
// requires to specify the maximum number of concurrent use of the package. Failing to
// set a high-enough value would impact latency and throughput. Likewise, overestimating
// would yield in an unnecessary big memory footprint.
// The default configuration doesn't use a memory pool.
package bitcask

14
doc_test.go Normal file
View File

@@ -0,0 +1,14 @@
package bitcask
func Example() {
_, _ = Open("path/to/db")
}
func Example_withOptions() {
opts := []Option{
WithMaxKeySize(1024),
WithMaxValueSize(4096),
WithMemPool(10),
}
_, _ = Open("path/to/db", opts...)
}

3
go.mod
View File

@@ -1,14 +1,15 @@
module github.com/prologic/bitcask
require (
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d
github.com/gofrs/flock v0.7.1
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.2
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/pelletier/go-toml v1.4.0 // indirect
github.com/pkg/errors v0.8.1
github.com/prologic/trie v0.0.0-20190322091023-3972df81f9b5
github.com/sirupsen/logrus v1.4.2
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cobra v0.0.5

6
go.sum
View File

@@ -21,6 +21,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d h1:TocZO8frNoxkwqFPePHFldSw8vLu+gBrlvFZYWqxiF4=
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d/go.mod h1:D6ICZm05D9VN1n/8iOtBxLpXtoGp6HDFUJ1RNVieOSE=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
@@ -77,6 +79,8 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg=
@@ -86,8 +90,6 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prologic/trie v0.0.0-20190322091023-3972df81f9b5 h1:H8dTZzU3aWNQnuRyiT45J9szv7EFakAhFzsFq27t3Uo=
github.com/prologic/trie v0.0.0-20190322091023-3972df81f9b5/go.mod h1:LFuDmpHJGmciXd8Rl5YMhVlLMps9gz2GtYLzwxrFhzs=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=

View File

@@ -1,26 +1,31 @@
package internal
import (
"bytes"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/oxtoacart/bpool"
"github.com/pkg/errors"
"golang.org/x/exp/mmap"
"github.com/gogo/protobuf/proto"
pb "github.com/prologic/bitcask/internal/proto"
"github.com/prologic/bitcask/internal/streampb"
)
const (
DefaultDatafileFilename = "%09d.data"
prefixSize = 8
)
var (
ErrReadonly = errors.New("error: read only datafile")
ErrReadError = errors.New("error: read error")
memPool *bpool.BufferPool
mxMemPool sync.RWMutex
)
type Datafile struct {
@@ -136,7 +141,17 @@ func (df *Datafile) Read() (e pb.Entry, n int64, err error) {
func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
var n int
b := make([]byte, size)
var b []byte
if memPool == nil {
b = make([]byte, size)
} else {
poolSlice := memPool.Get()
if poolSlice.Cap() < int(size) {
poolSlice.Grow(int(size) - poolSlice.Cap())
}
defer memPool.Put(poolSlice)
b = poolSlice.Bytes()[:size]
}
if df.w == nil {
n, err = df.ra.ReadAt(b, index)
@@ -151,9 +166,10 @@ func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
return
}
buf := bytes.NewBuffer(b)
dec := streampb.NewDecoder(buf)
_, err = dec.Decode(&e)
err = proto.Unmarshal(b[prefixSize:], &e)
if err != nil {
return
}
return
}
@@ -175,3 +191,15 @@ func (df *Datafile) Write(e pb.Entry) (int64, int64, error) {
return e.Offset, n, nil
}
// ConfigureMemPool configurate the mempool accordingly
func ConfigureMemPool(maxConcurrency *int) {
mxMemPool.Lock()
defer mxMemPool.Unlock()
if maxConcurrency == nil {
memPool = nil
} else {
memPool = bpool.NewBufferPool(*maxConcurrency)
}
return
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/gob"
"io"
"io/ioutil"
"os"
"sync"
)
@@ -41,17 +42,15 @@ func (k *Keydir) Add(key string, fileid int, offset, size int64) Item {
func (k *Keydir) Get(key string) (Item, bool) {
k.RLock()
defer k.RUnlock()
item, ok := k.kv[key]
k.RUnlock()
return item, ok
}
func (k *Keydir) Delete(key string) {
k.Lock()
defer k.Unlock()
delete(k.kv, key)
k.Unlock()
}
func (k *Keydir) Len() int {
@@ -62,11 +61,11 @@ func (k *Keydir) Keys() chan string {
ch := make(chan string)
go func() {
k.RLock()
defer k.RUnlock()
for key := range k.kv {
ch <- key
}
close(ch)
k.RUnlock()
}()
return ch
}
@@ -81,6 +80,21 @@ func (k *Keydir) Bytes() ([]byte, error) {
return buf.Bytes(), nil
}
func (k *Keydir) Load(fn string) error {
f, err := os.Open(fn)
if err != nil {
return err
}
defer f.Close()
dec := gob.NewDecoder(f)
if err := dec.Decode(&k.kv); err != nil {
return err
}
return nil
}
func (k *Keydir) Save(fn string) error {
data, err := k.Bytes()
if err != nil {

View File

@@ -2,12 +2,32 @@ package internal
import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
)
func Exists(path string) bool {
_, err := os.Stat(path)
return err == nil
}
func DirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
}
return err
})
return size, err
}
func GetDatafiles(path string) ([]string, error) {
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
if err != nil {

View File

@@ -1,5 +1,12 @@
package bitcask
import (
"encoding/json"
"errors"
"io/ioutil"
"path/filepath"
)
const (
// DefaultMaxDatafileSize is the default maximum datafile size in bytes
DefaultMaxDatafileSize = 1 << 20 // 1MB
@@ -11,6 +18,12 @@ const (
DefaultMaxValueSize = 1 << 16 // 65KB
)
var (
// ErrMaxConcurrencyLowerEqZero is the error returned for
// maxConcurrency option not greater than zero
ErrMaxConcurrencyLowerEqZero = errors.New("error: maxConcurrency must be greater than zero")
)
// Option is a function that takes a config struct and modifies it
type Option func(*config) error
@@ -18,6 +31,44 @@ type config struct {
maxDatafileSize int
maxKeySize int
maxValueSize int
maxConcurrency *int
}
func (c *config) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
}{
MaxDatafileSize: c.maxDatafileSize,
MaxKeySize: c.maxKeySize,
MaxValueSize: c.maxValueSize,
})
}
func getConfig(path string) (*config, error) {
type Config struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
}
var cfg Config
data, err := ioutil.ReadFile(filepath.Join(path, "config.json"))
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return &config{
maxDatafileSize: cfg.MaxDatafileSize,
maxKeySize: cfg.MaxKeySize,
maxValueSize: cfg.MaxValueSize,
}, nil
}
func newDefaultConfig() *config {
@@ -51,3 +102,14 @@ func WithMaxValueSize(size int) Option {
return nil
}
}
// WithMemPool configures usage of a memory pool to avoid allocations
func WithMemPool(maxConcurrency int) Option {
return func(cfg *config) error {
if maxConcurrency <= 0 {
return ErrMaxConcurrencyLowerEqZero
}
cfg.maxConcurrency = &maxConcurrency
return nil
}
}