mirror of
https://github.com/gogrlx/bitcask.git
synced 2026-04-04 12:02:46 -07:00
Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0c913ccee | ||
|
|
6b372d8334 | ||
|
|
3c1808cad3 | ||
|
|
5d1dd6657a | ||
|
|
1ba9ca46e3 | ||
|
|
2a419c46d2 | ||
|
|
e543fc38fb | ||
|
|
82e26449fa | ||
|
|
bce2721be4 | ||
|
|
f2b5515e03 | ||
|
|
8b684b635d | ||
|
|
a407905ae2 | ||
|
|
6ceeccfd64 | ||
|
|
35dc7e70d2 | ||
|
|
6cc1154611 | ||
|
|
8aa66c66da | ||
|
|
e3242c8426 | ||
|
|
912371645d | ||
|
|
bc782a3083 | ||
|
|
a2161179ef | ||
|
|
51bac21c0a |
13
AUTHORS
Normal file
13
AUTHORS
Normal file
@@ -0,0 +1,13 @@
|
||||
# Entries should be added alphabetically in the form:
|
||||
# Name or Organization <email address>
|
||||
# The email address is not required for organizations.
|
||||
|
||||
Awn Umar <awn@spacetime.dev>
|
||||
Christian Muehlhaeuser <muesli@gmail.com>
|
||||
Ignacio Hagopian <jsign.uy@gmail.com>
|
||||
James Mills <prologic@shortcircuit.net.au>
|
||||
Jesse Donat <donatj@gmail.com>
|
||||
Kebert Xela kebertxela
|
||||
panyun panyun
|
||||
Whemoon Jang <palindrom615@gmail.com>
|
||||
Yury Fedorov orlangure
|
||||
@@ -1,8 +1,12 @@
|
||||
# Contributing
|
||||
|
||||
No preference. If you know hot to use Github and contributed to open source projects before then:
|
||||
No preference. If you know how to use Github and have contributed to open source projects before then:
|
||||
|
||||
* File an issue
|
||||
* Submit a pull request
|
||||
* File an issue + Submit a pull request
|
||||
* Use this project somewhere :)
|
||||
|
||||
Be sure to add yourself to the [AUTHORS](/AUTHORS) file when you submit your PR(s). Every contribution counts no how big or small!
|
||||
|
||||
Thanks for using Bitcask!
|
||||
|
||||
96
README.md
96
README.md
@@ -4,14 +4,9 @@
|
||||
[](https://codecov.io/gh/prologic/bitcask)
|
||||
[](https://goreportcard.com/report/prologic/bitcask)
|
||||
[](https://godoc.org/github.com/prologic/bitcask)
|
||||
[](https://sourcegraph.com/github.com/prologic/bitcask?badge)
|
||||
[](https://github.com/prologic/bitcask/releases)
|
||||
[](https://github.com/prologic/bitcask)
|
||||
|
||||
[](https://microbadger.com/images/prologic/bitcask)
|
||||
[](https://microbadger.com/images/prologic/bitcask)
|
||||
|
||||
A high performance Key/Value store written in [Go](https://golang.org) with a predictable read/write performance and high throughput. Uses a [Bitcask](https://en.wikipedia.org/wiki/Bitcask) on-disk layout (LSM+WAL) similar to [Riak](https://riak.com/). 🗃️
|
||||
A high performance Key/Value store written in [Go](https://golang.org) with a predictable read/write performance and high throughput. Uses a [Bitcask](https://en.wikipedia.org/wiki/Bitcask) on-disk layout (LSM+WAL) similar to [Riak](https://riak.com/)
|
||||
|
||||
For a more feature-complete Redis-compatible server, distributed key/value store have a look at [Bitraft](https://github.com/prologic/bitraft) which uses this library as its backend. Use [Bitcask](https://github.com/prologic/bitcask) as a starting point or if you want to embed in your application, use [Bitraft](https://github.com/prologic/bitraft) if you need a complete server/client solution with high availability with a Redis-compatible API.
|
||||
|
||||
@@ -21,14 +16,14 @@ For a more feature-complete Redis-compatible server, distributed key/value store
|
||||
* Builtin CLI (`bitcask`)
|
||||
* Builtin Redis-compatible server (`bitcaskd`)
|
||||
* Predictable read/write performance
|
||||
* Low latecny
|
||||
* Low latency
|
||||
* High throughput (See: [Performance](README.md#Performance) )
|
||||
|
||||
## Development
|
||||
|
||||
1. Get the source
|
||||
|
||||
```#!bash
|
||||
```#!sh
|
||||
$ git clone https://github.com/prologic/bitcask.git
|
||||
```
|
||||
|
||||
@@ -37,20 +32,19 @@ $ git clone https://github.com/prologic/bitcask.git
|
||||
This library uses [Protobuf](https://github.com/protocolbuffers/protobuf) to serialize data on disk. Please follow the
|
||||
instructions for installing `protobuf` on your system. You will also need the
|
||||
following Go libraries/tools to generate Go code from Protobuf defs:
|
||||
- [protoc-gen-go](https://github.com/golang/protobuf)
|
||||
|
||||
3. Build the project
|
||||
|
||||
```#!bash
|
||||
```#!sh
|
||||
$ make
|
||||
```
|
||||
|
||||
This will invoke `go generate` and `go build`.
|
||||
|
||||
- [protoc-gen-go](https://github.com/golang/protobuf)
|
||||
|
||||
## Install
|
||||
|
||||
```#!bash
|
||||
```#!sh
|
||||
$ go get github.com/prologic/bitcask
|
||||
```
|
||||
|
||||
@@ -58,7 +52,7 @@ $ go get github.com/prologic/bitcask
|
||||
|
||||
Install the package into your project:
|
||||
|
||||
```#!bash
|
||||
```#!sh
|
||||
$ go get github.com/prologic/bitcask
|
||||
```
|
||||
|
||||
@@ -80,7 +74,7 @@ documentation and other examples.
|
||||
|
||||
## Usage (tool)
|
||||
|
||||
```#!bash
|
||||
```#!sh
|
||||
$ bitcask -p /tmp/db set Hello World
|
||||
$ bitcask -p /tmp/db get Hello
|
||||
World
|
||||
@@ -90,14 +84,14 @@ World
|
||||
|
||||
There is also a builtin very simple Redis-compatible server called `bitcaskd`:
|
||||
|
||||
```#!bash
|
||||
```#!sh
|
||||
$ ./bitcaskd ./tmp
|
||||
INFO[0000] starting bitcaskd v0.0.7@146f777 bind=":6379" path=./tmp
|
||||
```
|
||||
|
||||
Example session:
|
||||
|
||||
```
|
||||
```#!sh
|
||||
$ telnet localhost 6379
|
||||
Trying ::1...
|
||||
Connected to localhost.
|
||||
@@ -122,7 +116,7 @@ Connection closed by foreign host.
|
||||
|
||||
You can also use the [Bitcask Docker Image](https://cloud.docker.com/u/prologic/repository/docker/prologic/bitcask):
|
||||
|
||||
```#!bash
|
||||
```#!sh
|
||||
$ docker pull prologic/bitcask
|
||||
$ docker run -d -p 6379:6379 prologic/bitcask
|
||||
```
|
||||
@@ -131,39 +125,59 @@ $ docker run -d -p 6379:6379 prologic/bitcask
|
||||
|
||||
Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7:
|
||||
|
||||
```
|
||||
```#!sh
|
||||
$ make bench
|
||||
...
|
||||
BenchmarkGet/128B-4 500000 2537 ns/op 672 B/op 7 allocs/op
|
||||
BenchmarkGet/256B-4 500000 2629 ns/op 1056 B/op 7 allocs/op
|
||||
BenchmarkGet/512B-4 500000 2773 ns/op 1888 B/op 7 allocs/op
|
||||
BenchmarkGet/1K-4 500000 3202 ns/op 3552 B/op 7 allocs/op
|
||||
BenchmarkGet/2K-4 300000 3904 ns/op 6880 B/op 7 allocs/op
|
||||
BenchmarkGet/4K-4 300000 5678 ns/op 14048 B/op 7 allocs/op
|
||||
BenchmarkGet/8K-4 200000 8948 ns/op 27360 B/op 7 allocs/op
|
||||
BenchmarkGet/16K-4 100000 14635 ns/op 53472 B/op 7 allocs/op
|
||||
BenchmarkGet/32K-4 50000 28292 ns/op 114912 B/op 7 allocs/op
|
||||
goos: darwin
|
||||
goarch: amd64
|
||||
pkg: github.com/prologic/bitcask
|
||||
|
||||
BenchmarkPut/128B-4 200000 8173 ns/op 409 B/op 6 allocs/op
|
||||
BenchmarkPut/256B-4 200000 8404 ns/op 538 B/op 6 allocs/op
|
||||
BenchmarkPut/512B-4 200000 9741 ns/op 829 B/op 6 allocs/op
|
||||
BenchmarkPut/1K-4 100000 13118 ns/op 1411 B/op 6 allocs/op
|
||||
BenchmarkPut/2K-4 100000 17982 ns/op 2573 B/op 6 allocs/op
|
||||
BenchmarkPut/4K-4 50000 35477 ns/op 5154 B/op 6 allocs/op
|
||||
BenchmarkPut/8K-4 30000 54021 ns/op 9804 B/op 6 allocs/op
|
||||
BenchmarkPut/16K-4 20000 96551 ns/op 18849 B/op 6 allocs/op
|
||||
BenchmarkPut/32K-4 10000 129957 ns/op 41561 B/op 7 allocs/op
|
||||
BenchmarkGet/128B-4 300000 3913 ns/op 32.71 MB/s 387 B/op 4 allocs/op
|
||||
BenchmarkGet/128BWithPool-4 300000 4143 ns/op 30.89 MB/s 227 B/op 3 allocs/op
|
||||
BenchmarkGet/256B-4 300000 3919 ns/op 65.31 MB/s 643 B/op 4 allocs/op
|
||||
BenchmarkGet/256BWithPool-4 300000 4270 ns/op 59.95 MB/s 355 B/op 3 allocs/op
|
||||
BenchmarkGet/512B-4 300000 4248 ns/op 120.52 MB/s 1187 B/op 4 allocs/op
|
||||
BenchmarkGet/512BWithPool-4 300000 4676 ns/op 109.48 MB/s 611 B/op 3 allocs/op
|
||||
BenchmarkGet/1K-4 200000 5248 ns/op 195.10 MB/s 2275 B/op 4 allocs/op
|
||||
BenchmarkGet/1KWithPool-4 200000 5270 ns/op 194.28 MB/s 1123 B/op 3 allocs/op
|
||||
BenchmarkGet/2K-4 200000 6229 ns/op 328.74 MB/s 4451 B/op 4 allocs/op
|
||||
BenchmarkGet/2KWithPool-4 200000 6282 ns/op 325.99 MB/s 2147 B/op 3 allocs/op
|
||||
BenchmarkGet/4K-4 200000 9027 ns/op 453.74 MB/s 9059 B/op 4 allocs/op
|
||||
BenchmarkGet/4KWithPool-4 200000 8906 ns/op 459.87 MB/s 4195 B/op 3 allocs/op
|
||||
BenchmarkGet/8K-4 100000 12024 ns/op 681.28 MB/s 17763 B/op 4 allocs/op
|
||||
BenchmarkGet/8KWithPool-4 200000 11103 ns/op 737.79 MB/s 8291 B/op 3 allocs/op
|
||||
BenchmarkGet/16K-4 100000 16844 ns/op 972.65 MB/s 34915 B/op 4 allocs/op
|
||||
BenchmarkGet/16KWithPool-4 100000 14575 ns/op 1124.10 MB/s 16483 B/op 3 allocs/op
|
||||
BenchmarkGet/32K-4 50000 27770 ns/op 1179.97 MB/s 73827 B/op 4 allocs/op
|
||||
BenchmarkGet/32KWithPool-4 100000 24495 ns/op 1337.74 MB/s 32867 B/op 3 allocs/op
|
||||
|
||||
BenchmarkScan-4 1000000 2011 ns/op 493 B/op 25 allocs/op
|
||||
BenchmarkPut/128B-4 100000 17492 ns/op 7.32 MB/s 441 B/op 6 allocs/op
|
||||
BenchmarkPut/256B-4 100000 17234 ns/op 14.85 MB/s 571 B/op 6 allocs/op
|
||||
BenchmarkPut/512B-4 100000 22837 ns/op 22.42 MB/s 861 B/op 6 allocs/op
|
||||
BenchmarkPut/1K-4 50000 30333 ns/op 33.76 MB/s 1443 B/op 6 allocs/op
|
||||
BenchmarkPut/2K-4 30000 45304 ns/op 45.21 MB/s 2606 B/op 6 allocs/op
|
||||
BenchmarkPut/4K-4 20000 83953 ns/op 48.79 MB/s 5187 B/op 6 allocs/op
|
||||
BenchmarkPut/8K-4 10000 142142 ns/op 57.63 MB/s 9845 B/op 6 allocs/op
|
||||
BenchmarkPut/16K-4 5000 206722 ns/op 79.26 MB/s 18884 B/op 6 allocs/op
|
||||
BenchmarkPut/32K-4 5000 361108 ns/op 90.74 MB/s 41582 B/op 7 allocs/op
|
||||
|
||||
BenchmarkScan-4 1000000 1679 ns/op 408 B/op 16 allocs/op
|
||||
PASS
|
||||
```
|
||||
|
||||
For 128B values:
|
||||
|
||||
* ~400,000 reads/sec
|
||||
* ~130,000 writes/sec
|
||||
* ~200,000 reads/sec
|
||||
* ~50,000 writes/sec
|
||||
|
||||
The full benchmark above shows linear performance as you increase key/value sizes.
|
||||
The full benchmark above shows linear performance as you increase key/value sizes. Memory pooling starts to become advantageous for larger values.
|
||||
|
||||
## Contributors
|
||||
|
||||
Thank you to all those that have contributed to this project, battle-tested it, used it in their own projects or pdocuts, fixed bugs, improved performance and even fix tiny tpyos in documentation! Thank you and keep contirbuting!
|
||||
|
||||
You can find an [AUTHORS](/AUTHORS) file where we keep a list of contributors to the project. If you contriibute a PR please consider adding your name there. There is also Github's own [Contributors](https://github.com/prologic/bitcask/graphs/contributors) statistics.
|
||||
|
||||
## License
|
||||
|
||||
bitcask is licensed under the [MIT License](https://github.com/prologic/bitcask/blob/master/LICENSE)
|
||||
bitcask is licensed under the term of the [MIT License](https://github.com/prologic/bitcask/blob/master/LICENSE)
|
||||
|
||||
376
bitcask.go
376
bitcask.go
@@ -1,17 +1,18 @@
|
||||
package bitcask
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/derekparker/trie"
|
||||
"github.com/gofrs/flock"
|
||||
"github.com/prologic/trie"
|
||||
|
||||
"github.com/prologic/bitcask/internal"
|
||||
)
|
||||
@@ -35,6 +36,10 @@ var (
|
||||
// ErrDatabaseLocked is the error returned if the database is locked
|
||||
// (typically opened by another process)
|
||||
ErrDatabaseLocked = errors.New("error: database locked")
|
||||
|
||||
// ErrCreatingMemPool is the error returned when trying to configurate
|
||||
// the mempool fails
|
||||
ErrCreatingMemPool = errors.New("error: creating the mempool failed")
|
||||
)
|
||||
|
||||
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
|
||||
@@ -46,13 +51,38 @@ type Bitcask struct {
|
||||
*flock.Flock
|
||||
|
||||
config *config
|
||||
options []Option
|
||||
path string
|
||||
curr *internal.Datafile
|
||||
keydir *internal.Keydir
|
||||
datafiles []*internal.Datafile
|
||||
datafiles map[int]*internal.Datafile
|
||||
trie *trie.Trie
|
||||
}
|
||||
|
||||
// Stats is a struct returned by Stats() on an open Bitcask instance
|
||||
type Stats struct {
|
||||
Datafiles int
|
||||
Keys int
|
||||
Size int64
|
||||
}
|
||||
|
||||
// Stats returns statistics about the database including the number of
|
||||
// data files, keys and overall size on disk of the data
|
||||
func (b *Bitcask) Stats() (stats Stats, err error) {
|
||||
var size int64
|
||||
|
||||
size, err = internal.DirSize(b.path)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
stats.Datafiles = len(b.datafiles)
|
||||
stats.Keys = b.keydir.Len()
|
||||
stats.Size = size
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Close closes the database and removes the lock. It is important to call
|
||||
// Close() as this is the only way to cleanup the lock held by the open
|
||||
// database.
|
||||
@@ -62,9 +92,16 @@ func (b *Bitcask) Close() error {
|
||||
os.Remove(b.Flock.Path())
|
||||
}()
|
||||
|
||||
for _, df := range b.datafiles {
|
||||
df.Close()
|
||||
if err := b.keydir.Save(path.Join(b.path, "index")); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, df := range b.datafiles {
|
||||
if err := df.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return b.curr.Close()
|
||||
}
|
||||
|
||||
@@ -188,14 +225,16 @@ func (b *Bitcask) put(key string, value []byte) (int64, int64, error) {
|
||||
return -1, 0, err
|
||||
}
|
||||
|
||||
df, err := internal.NewDatafile(b.path, b.curr.FileID(), true)
|
||||
id := b.curr.FileID()
|
||||
|
||||
df, err := internal.NewDatafile(b.path, id, true)
|
||||
if err != nil {
|
||||
return -1, 0, err
|
||||
}
|
||||
|
||||
b.datafiles = append(b.datafiles, df)
|
||||
b.datafiles[id] = df
|
||||
|
||||
id := b.curr.FileID() + 1
|
||||
id = b.curr.FileID() + 1
|
||||
curr, err := internal.NewDatafile(b.path, id, false)
|
||||
if err != nil {
|
||||
return -1, 0, err
|
||||
@@ -207,176 +246,72 @@ func (b *Bitcask) put(key string, value []byte) (int64, int64, error) {
|
||||
return b.curr.Write(e)
|
||||
}
|
||||
|
||||
// Merge merges all datafiles in the database creating hint files for faster
|
||||
// startup. Old keys are squashed and deleted keys removes. Call this function
|
||||
// periodically to reclaim disk space.
|
||||
func Merge(path string, force bool) error {
|
||||
fns, err := internal.GetDatafiles(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ids, err := internal.ParseIds(fns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Do not merge if we only have 1 Datafile
|
||||
if len(ids) <= 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Don't merge the Active Datafile (the last one)
|
||||
fns = fns[:len(fns)-1]
|
||||
ids = ids[:len(ids)-1]
|
||||
|
||||
temp, err := ioutil.TempDir(path, "merge")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.RemoveAll(temp)
|
||||
|
||||
for i, fn := range fns {
|
||||
// Don't merge Datafiles whose .hint files we've already generated
|
||||
// (they are already merged); unless we set the force flag to true
|
||||
// (forcing a re-merge).
|
||||
if filepath.Ext(fn) == ".hint" && !force {
|
||||
// Already merged
|
||||
continue
|
||||
}
|
||||
|
||||
id := ids[i]
|
||||
|
||||
keydir := internal.NewKeydir()
|
||||
|
||||
df, err := internal.NewDatafile(path, id, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer df.Close()
|
||||
|
||||
for {
|
||||
e, n, err := df.Read()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Tombstone value (deleted key)
|
||||
if len(e.Value) == 0 {
|
||||
keydir.Delete(e.Key)
|
||||
continue
|
||||
}
|
||||
|
||||
keydir.Add(e.Key, ids[i], e.Offset, n)
|
||||
}
|
||||
|
||||
tempdf, err := internal.NewDatafile(temp, id, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tempdf.Close()
|
||||
|
||||
for key := range keydir.Keys() {
|
||||
item, _ := keydir.Get(key)
|
||||
e, err := df.ReadAt(item.Offset, item.Size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, _, err = tempdf.Write(e)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = tempdf.Close()
|
||||
func (b *Bitcask) readConfig() error {
|
||||
if internal.Exists(filepath.Join(b.path, "config.json")) {
|
||||
data, err := ioutil.ReadFile(filepath.Join(b.path, "config.json"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = df.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = os.Rename(tempdf.Name(), df.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hint := strings.TrimSuffix(df.Name(), ".data") + ".hint"
|
||||
err = keydir.Save(hint)
|
||||
if err != nil {
|
||||
if err := json.Unmarshal(data, &b.config); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Open opens the database at the given path with optional options.
|
||||
// Options can be provided with the `WithXXX` functions that provide
|
||||
// configuration options as functions.
|
||||
func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
if err := os.MkdirAll(path, 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := Merge(path, false)
|
||||
func (b *Bitcask) writeConfig() error {
|
||||
data, err := json.Marshal(b.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
return ioutil.WriteFile(filepath.Join(b.path, "config.json"), data, 0600)
|
||||
}
|
||||
|
||||
fns, err := internal.GetDatafiles(path)
|
||||
func (b *Bitcask) reopen() error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
fns, err := internal.GetDatafiles(b.path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
ids, err := internal.ParseIds(fns)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
var datafiles []*internal.Datafile
|
||||
datafiles := make(map[int]*internal.Datafile, len(ids))
|
||||
|
||||
for _, id := range ids {
|
||||
df, err := internal.NewDatafile(b.path, id, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
datafiles[id] = df
|
||||
}
|
||||
|
||||
keydir := internal.NewKeydir()
|
||||
trie := trie.New()
|
||||
|
||||
for i, fn := range fns {
|
||||
df, err := internal.NewDatafile(path, ids[i], true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if internal.Exists(path.Join(b.path, "index")) {
|
||||
if err := keydir.Load(path.Join(b.path, "index")); err != nil {
|
||||
return err
|
||||
}
|
||||
datafiles = append(datafiles, df)
|
||||
|
||||
if filepath.Ext(fn) == ".hint" {
|
||||
f, err := os.Open(filepath.Join(path, fn))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
hint, err := internal.NewKeydirFromBytes(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for key := range hint.Keys() {
|
||||
item, _ := hint.Get(key)
|
||||
_ = keydir.Add(key, item.FileID, item.Offset, item.Size)
|
||||
trie.Add(key, item)
|
||||
}
|
||||
} else {
|
||||
for key := range keydir.Keys() {
|
||||
item, _ := keydir.Get(key)
|
||||
trie.Add(key, item)
|
||||
}
|
||||
} else {
|
||||
for i, df := range datafiles {
|
||||
for {
|
||||
e, n, err := df.Read()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
// Tombstone value (deleted key)
|
||||
@@ -396,28 +331,134 @@ func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
id = ids[(len(ids) - 1)]
|
||||
}
|
||||
|
||||
curr, err := internal.NewDatafile(path, id, false)
|
||||
curr, err := internal.NewDatafile(b.path, id, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.curr = curr
|
||||
b.datafiles = datafiles
|
||||
|
||||
b.keydir = keydir
|
||||
|
||||
b.trie = trie
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Merge merges all datafiles in the database. Old keys are squashed
|
||||
// and deleted keys removes. Duplicate key/value pairs are also removed.
|
||||
// Call this function periodically to reclaim disk space.
|
||||
func (b *Bitcask) Merge() error {
|
||||
// Temporary merged database path
|
||||
temp, err := ioutil.TempDir(b.path, "merge")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.RemoveAll(temp)
|
||||
|
||||
// Create a merged database
|
||||
mdb, err := Open(temp, b.options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Rewrite all key/value pairs into merged database
|
||||
// Doing this automatically strips deleted keys and
|
||||
// old key/value pairs
|
||||
err = b.Fold(func(key string) error {
|
||||
value, err := b.Get(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := mdb.Put(key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = mdb.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Close the database
|
||||
err = b.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove all data files
|
||||
files, err := ioutil.ReadDir(b.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, file := range files {
|
||||
if !file.IsDir() {
|
||||
err := os.RemoveAll(path.Join([]string{b.path, file.Name()}...))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Rename all merged data files
|
||||
files, err = ioutil.ReadDir(mdb.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, file := range files {
|
||||
err := os.Rename(
|
||||
path.Join([]string{mdb.path, file.Name()}...),
|
||||
path.Join([]string{b.path, file.Name()}...),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// And finally reopen the database
|
||||
return b.reopen()
|
||||
}
|
||||
|
||||
// Open opens the database at the given path with optional options.
|
||||
// Options can be provided with the `WithXXX` functions that provide
|
||||
// configuration options as functions.
|
||||
func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
var (
|
||||
cfg *config
|
||||
err error
|
||||
)
|
||||
|
||||
if err := os.MkdirAll(path, 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg, err = getConfig(path)
|
||||
if err != nil {
|
||||
cfg = newDefaultConfig()
|
||||
}
|
||||
|
||||
bitcask := &Bitcask{
|
||||
Flock: flock.New(filepath.Join(path, "lock")),
|
||||
config: newDefaultConfig(),
|
||||
path: path,
|
||||
curr: curr,
|
||||
keydir: keydir,
|
||||
datafiles: datafiles,
|
||||
trie: trie,
|
||||
Flock: flock.New(filepath.Join(path, "lock")),
|
||||
config: cfg,
|
||||
options: options,
|
||||
path: path,
|
||||
}
|
||||
|
||||
for _, opt := range options {
|
||||
err = opt(bitcask.config)
|
||||
if err != nil {
|
||||
if err := opt(bitcask.config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
internal.ConfigureMemPool(bitcask.config.maxConcurrency)
|
||||
|
||||
locked, err := bitcask.Flock.TryLock()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -427,5 +468,26 @@ func Open(path string, options ...Option) (*Bitcask, error) {
|
||||
return nil, ErrDatabaseLocked
|
||||
}
|
||||
|
||||
if err := bitcask.writeConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := bitcask.reopen(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return bitcask, nil
|
||||
}
|
||||
|
||||
// Merge calls Bitcask.Merge()
|
||||
// XXX: Deprecated; Please use the `.Merge()` method
|
||||
// XXX: This is only kept here for backwards compatibility
|
||||
// it will be removed in future releases at some point
|
||||
func Merge(path string, force bool) error {
|
||||
db, err := Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return db.Merge()
|
||||
}
|
||||
|
||||
197
bitcask_test.go
197
bitcask_test.go
@@ -211,38 +211,39 @@ func TestMaxValueSize(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestOpenMerge(t *testing.T) {
|
||||
func TestStats(t *testing.T) {
|
||||
var (
|
||||
db *Bitcask
|
||||
err error
|
||||
)
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
testdir, err := ioutil.TempDir("", "bitcask")
|
||||
assert.NoError(err)
|
||||
|
||||
t.Run("Setup", func(t *testing.T) {
|
||||
var (
|
||||
db *Bitcask
|
||||
err error
|
||||
)
|
||||
|
||||
t.Run("Open", func(t *testing.T) {
|
||||
db, err = Open(testdir, WithMaxDatafileSize(32))
|
||||
db, err = Open(testdir)
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Put", func(t *testing.T) {
|
||||
for i := 0; i < 1024; i++ {
|
||||
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
|
||||
assert.NoError(err)
|
||||
}
|
||||
err := db.Put("foo", []byte("bar"))
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Get", func(t *testing.T) {
|
||||
for i := 0; i < 32; i++ {
|
||||
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
|
||||
assert.NoError(err)
|
||||
val, err := db.Get(string(i))
|
||||
assert.NoError(err)
|
||||
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
|
||||
}
|
||||
val, err := db.Get("foo")
|
||||
assert.NoError(err)
|
||||
assert.Equal([]byte("bar"), val)
|
||||
})
|
||||
|
||||
t.Run("Stats", func(t *testing.T) {
|
||||
stats, err := db.Stats()
|
||||
assert.NoError(err)
|
||||
assert.Equal(stats.Datafiles, 0)
|
||||
assert.Equal(stats.Keys, 1)
|
||||
})
|
||||
|
||||
t.Run("Sync", func(t *testing.T) {
|
||||
@@ -255,34 +256,9 @@ func TestOpenMerge(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Merge", func(t *testing.T) {
|
||||
var (
|
||||
db *Bitcask
|
||||
err error
|
||||
)
|
||||
|
||||
t.Run("Open", func(t *testing.T) {
|
||||
db, err = Open(testdir)
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Get", func(t *testing.T) {
|
||||
for i := 0; i < 32; i++ {
|
||||
val, err := db.Get(string(i))
|
||||
assert.NoError(err)
|
||||
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Close", func(t *testing.T) {
|
||||
err = db.Close()
|
||||
assert.NoError(err)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMergeOpen(t *testing.T) {
|
||||
func TestMerge(t *testing.T) {
|
||||
var (
|
||||
db *Bitcask
|
||||
err error
|
||||
@@ -300,22 +276,40 @@ func TestMergeOpen(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("Put", func(t *testing.T) {
|
||||
for i := 0; i < 1024; i++ {
|
||||
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
|
||||
err := db.Put("foo", []byte("bar"))
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
s1, err := db.Stats()
|
||||
assert.NoError(err)
|
||||
assert.Equal(0, s1.Datafiles)
|
||||
assert.Equal(1, s1.Keys)
|
||||
|
||||
t.Run("Put", func(t *testing.T) {
|
||||
for i := 0; i < 10; i++ {
|
||||
err := db.Put("foo", []byte("bar"))
|
||||
assert.NoError(err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Get", func(t *testing.T) {
|
||||
for i := 0; i < 32; i++ {
|
||||
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
|
||||
assert.NoError(err)
|
||||
val, err := db.Get(string(i))
|
||||
assert.NoError(err)
|
||||
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
|
||||
}
|
||||
s2, err := db.Stats()
|
||||
assert.NoError(err)
|
||||
assert.Equal(5, s2.Datafiles)
|
||||
assert.Equal(1, s2.Keys)
|
||||
assert.True(s2.Size > s1.Size)
|
||||
|
||||
t.Run("Merge", func(t *testing.T) {
|
||||
err := db.Merge()
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
s3, err := db.Stats()
|
||||
assert.NoError(err)
|
||||
assert.Equal(1, s3.Datafiles)
|
||||
assert.Equal(1, s3.Keys)
|
||||
assert.True(s3.Size > s1.Size)
|
||||
assert.True(s3.Size < s2.Size)
|
||||
|
||||
t.Run("Sync", func(t *testing.T) {
|
||||
err = db.Sync()
|
||||
assert.NoError(err)
|
||||
@@ -326,31 +320,6 @@ func TestMergeOpen(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Merge", func(t *testing.T) {
|
||||
t.Run("Merge", func(t *testing.T) {
|
||||
err = Merge(testdir, true)
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Open", func(t *testing.T) {
|
||||
db, err = Open(testdir)
|
||||
assert.NoError(err)
|
||||
})
|
||||
|
||||
t.Run("Get", func(t *testing.T) {
|
||||
for i := 0; i < 32; i++ {
|
||||
val, err := db.Get(string(i))
|
||||
assert.NoError(err)
|
||||
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Close", func(t *testing.T) {
|
||||
err = db.Close()
|
||||
assert.NoError(err)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestConcurrent(t *testing.T) {
|
||||
@@ -499,8 +468,9 @@ func TestLocking(t *testing.T) {
|
||||
}
|
||||
|
||||
type benchmarkTestCase struct {
|
||||
name string
|
||||
size int
|
||||
name string
|
||||
size int
|
||||
withPool bool
|
||||
}
|
||||
|
||||
func BenchmarkGet(b *testing.B) {
|
||||
@@ -515,22 +485,25 @@ func BenchmarkGet(b *testing.B) {
|
||||
}
|
||||
defer os.RemoveAll(testdir)
|
||||
|
||||
db, err := Open(testdir)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
tests := []benchmarkTestCase{
|
||||
{"128B", 128},
|
||||
{"256B", 256},
|
||||
{"512B", 512},
|
||||
{"1K", 1024},
|
||||
{"2K", 2048},
|
||||
{"4K", 4096},
|
||||
{"8K", 8192},
|
||||
{"16K", 16384},
|
||||
{"32K", 32768},
|
||||
{"128B", 128, false},
|
||||
{"128BWithPool", 128, true},
|
||||
{"256B", 256, false},
|
||||
{"256BWithPool", 256, true},
|
||||
{"512B", 512, false},
|
||||
{"512BWithPool", 512, true},
|
||||
{"1K", 1024, false},
|
||||
{"1KWithPool", 1024, true},
|
||||
{"2K", 2048, false},
|
||||
{"2KWithPool", 2048, true},
|
||||
{"4K", 4096, false},
|
||||
{"4KWithPool", 4096, true},
|
||||
{"8K", 8192, false},
|
||||
{"8KWithPool", 8192, true},
|
||||
{"16K", 16384, false},
|
||||
{"16KWithPool", 16384, true},
|
||||
{"32K", 32768, false},
|
||||
{"32KWithPool", 32768, true},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
@@ -540,6 +513,18 @@ func BenchmarkGet(b *testing.B) {
|
||||
key := "foo"
|
||||
value := []byte(strings.Repeat(" ", tt.size))
|
||||
|
||||
options := []Option{
|
||||
WithMaxKeySize(len(key)),
|
||||
WithMaxValueSize(tt.size),
|
||||
}
|
||||
if tt.withPool {
|
||||
options = append(options, WithMemPool(1))
|
||||
}
|
||||
db, err := Open(testdir, options...)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
err = db.Put(key, value)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
@@ -555,6 +540,8 @@ func BenchmarkGet(b *testing.B) {
|
||||
b.Errorf("unexpected value")
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
db.Close()
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -578,15 +565,15 @@ func BenchmarkPut(b *testing.B) {
|
||||
defer db.Close()
|
||||
|
||||
tests := []benchmarkTestCase{
|
||||
{"128B", 128},
|
||||
{"256B", 256},
|
||||
{"512B", 512},
|
||||
{"1K", 1024},
|
||||
{"2K", 2048},
|
||||
{"4K", 4096},
|
||||
{"8K", 8192},
|
||||
{"16K", 16384},
|
||||
{"32K", 32768},
|
||||
{"128B", 128, false},
|
||||
{"256B", 256, false},
|
||||
{"512B", 512, false},
|
||||
{"1K", 1024, false},
|
||||
{"2K", 2048, false},
|
||||
{"4K", 4096, false},
|
||||
{"8K", 8192, false},
|
||||
{"16K", 16384, false},
|
||||
{"32K", 32768, false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
||||
142
cmd/bitcask/export.go
Normal file
142
cmd/bitcask/export.go
Normal file
@@ -0,0 +1,142 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"github.com/prologic/bitcask"
|
||||
)
|
||||
|
||||
var errNotAllDataWritten = errors.New("error: not all data written")
|
||||
|
||||
var exportCmd = &cobra.Command{
|
||||
Use: "export",
|
||||
Aliases: []string{"backup", "dump"},
|
||||
Short: "Export a database",
|
||||
Long: `This command allows you to export or dump/backup a database's
|
||||
key/values into a long-term portable archival format suitable for backup and
|
||||
restore purposes or migrating from older on-disk formats of Bitcask.
|
||||
|
||||
All key/value pairs are base64 encoded and serialized as JSON one pair per
|
||||
line to form an output stream to either standard output or a file. You can
|
||||
optionally compress the output with standard compression tools such as gzip.`,
|
||||
Args: cobra.RangeArgs(0, 1),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
var output string
|
||||
|
||||
path := viper.GetString("path")
|
||||
|
||||
if len(args) == 1 {
|
||||
output = args[0]
|
||||
} else {
|
||||
output = "-"
|
||||
}
|
||||
|
||||
os.Exit(export(path, output))
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(exportCmd)
|
||||
|
||||
exportCmd.PersistentFlags().IntP(
|
||||
"with-max-datafile-size", "", bitcask.DefaultMaxDatafileSize,
|
||||
"Maximum size of each datafile",
|
||||
)
|
||||
exportCmd.PersistentFlags().IntP(
|
||||
"with-max-key-size", "", bitcask.DefaultMaxKeySize,
|
||||
"Maximum size of each key",
|
||||
)
|
||||
exportCmd.PersistentFlags().IntP(
|
||||
"with-max-value-size", "", bitcask.DefaultMaxValueSize,
|
||||
"Maximum size of each value",
|
||||
)
|
||||
}
|
||||
|
||||
type kvPair struct {
|
||||
Key string `json:"key"`
|
||||
Value string `json:"value"`
|
||||
}
|
||||
|
||||
func export(path, output string) int {
|
||||
var (
|
||||
err error
|
||||
w io.WriteCloser
|
||||
)
|
||||
|
||||
db, err := bitcask.Open(path)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error opening database")
|
||||
return 1
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
if output == "-" {
|
||||
w = os.Stdout
|
||||
} else {
|
||||
w, err = os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
log.WithError(err).
|
||||
WithField("output", output).
|
||||
Error("error opening output for writing")
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
err = db.Fold(func(key string) error {
|
||||
value, err := db.Get(key)
|
||||
if err != nil {
|
||||
log.WithError(err).
|
||||
WithField("key", key).
|
||||
Error("error reading key")
|
||||
return err
|
||||
}
|
||||
|
||||
kv := kvPair{
|
||||
Key: base64.StdEncoding.EncodeToString([]byte(key)),
|
||||
Value: base64.StdEncoding.EncodeToString(value),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(&kv)
|
||||
if err != nil {
|
||||
log.WithError(err).
|
||||
WithField("key", key).
|
||||
Error("error serialzing key")
|
||||
return err
|
||||
}
|
||||
|
||||
if n, err := w.Write(data); err != nil || n != len(data) {
|
||||
if err == nil && n != len(data) {
|
||||
err = errNotAllDataWritten
|
||||
}
|
||||
log.WithError(err).
|
||||
WithField("key", key).
|
||||
WithField("n", n).
|
||||
Error("error writing key")
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := w.Write([]byte("\n")); err != nil {
|
||||
log.WithError(err).Error("error writing newline")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).
|
||||
WithField("path", path).
|
||||
WithField("output", output).
|
||||
Error("error exporting keys")
|
||||
return 2
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
106
cmd/bitcask/import.go
Normal file
106
cmd/bitcask/import.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"github.com/prologic/bitcask"
|
||||
)
|
||||
|
||||
var importCmd = &cobra.Command{
|
||||
Use: "import",
|
||||
Aliases: []string{"restore", "read"},
|
||||
Short: "Import a database",
|
||||
Long: `This command allows you to import or restore a database from a
|
||||
previous export/dump using the export command either creating a new database
|
||||
or adding additional key/value pairs to an existing one.`,
|
||||
Args: cobra.RangeArgs(0, 1),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
var input string
|
||||
|
||||
path := viper.GetString("path")
|
||||
|
||||
if len(args) == 1 {
|
||||
input = args[0]
|
||||
} else {
|
||||
input = "-"
|
||||
}
|
||||
|
||||
os.Exit(_import(path, input))
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(importCmd)
|
||||
}
|
||||
|
||||
func _import(path, input string) int {
|
||||
var (
|
||||
err error
|
||||
r io.ReadCloser
|
||||
)
|
||||
|
||||
db, err := bitcask.Open(path)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error opening database")
|
||||
return 1
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
if input == "-" {
|
||||
r = os.Stdin
|
||||
} else {
|
||||
r, err = os.Open(input)
|
||||
if err != nil {
|
||||
log.WithError(err).
|
||||
WithField("input", input).
|
||||
Error("error opening input for reading")
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
var kv kvPair
|
||||
|
||||
scanner := bufio.NewScanner(r)
|
||||
for scanner.Scan() {
|
||||
if err := json.Unmarshal(scanner.Bytes(), &kv); err != nil {
|
||||
log.WithError(err).
|
||||
WithField("input", input).
|
||||
Error("error reading input")
|
||||
return 2
|
||||
}
|
||||
|
||||
key, err := base64.StdEncoding.DecodeString(kv.Key)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error decoding key")
|
||||
return 2
|
||||
}
|
||||
|
||||
value, err := base64.StdEncoding.DecodeString(kv.Value)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error decoding value")
|
||||
return 2
|
||||
}
|
||||
|
||||
if err := db.Put(string(key), value); err != nil {
|
||||
log.WithError(err).Error("error writing key/value")
|
||||
return 2
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
log.WithError(err).
|
||||
WithField("input", input).
|
||||
Error("error reading input")
|
||||
return 2
|
||||
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
67
cmd/bitcask/initdb.go
Normal file
67
cmd/bitcask/initdb.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"github.com/prologic/bitcask"
|
||||
)
|
||||
|
||||
var initdbCmd = &cobra.Command{
|
||||
Use: "initdb",
|
||||
Aliases: []string{"create", "init"},
|
||||
Short: "Initialize a new database",
|
||||
Long: `This initializes a new database with persisted options`,
|
||||
Args: cobra.ExactArgs(0),
|
||||
PreRun: func(cmd *cobra.Command, args []string) {
|
||||
viper.BindPFlag("with-max-datafile-size", cmd.Flags().Lookup("with-max-datafile-size"))
|
||||
viper.SetDefault("with-max-datafile-size", bitcask.DefaultMaxDatafileSize)
|
||||
|
||||
viper.BindPFlag("with-max-key-size", cmd.Flags().Lookup("with-max-key-size"))
|
||||
viper.SetDefault("with-max-key-size", bitcask.DefaultMaxKeySize)
|
||||
|
||||
viper.BindPFlag("with-max-value-size", cmd.Flags().Lookup("with-max-value-size"))
|
||||
viper.SetDefault("with-max-value-size", bitcask.DefaultMaxValueSize)
|
||||
},
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
path := viper.GetString("path")
|
||||
|
||||
maxDatafileSize := viper.GetInt("with-max-datafile-size")
|
||||
maxKeySize := viper.GetInt("with-max-key-size")
|
||||
maxValueSize := viper.GetInt("with-max-value-size")
|
||||
|
||||
db, err := bitcask.Open(
|
||||
path,
|
||||
bitcask.WithMaxDatafileSize(maxDatafileSize),
|
||||
bitcask.WithMaxKeySize(maxKeySize),
|
||||
bitcask.WithMaxValueSize(maxValueSize),
|
||||
)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error opening database")
|
||||
os.Exit(1)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
os.Exit(0)
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(initdbCmd)
|
||||
|
||||
initdbCmd.PersistentFlags().IntP(
|
||||
"with-max-datafile-size", "", bitcask.DefaultMaxDatafileSize,
|
||||
"Maximum size of each datafile",
|
||||
)
|
||||
initdbCmd.PersistentFlags().IntP(
|
||||
"with-max-key-size", "", bitcask.DefaultMaxKeySize,
|
||||
"Maximum size of each key",
|
||||
)
|
||||
initdbCmd.PersistentFlags().IntP(
|
||||
"with-max-value-size", "", bitcask.DefaultMaxValueSize,
|
||||
"Maximum size of each value",
|
||||
)
|
||||
}
|
||||
@@ -20,28 +20,23 @@ keys.`,
|
||||
Args: cobra.ExactArgs(0),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
path := viper.GetString("path")
|
||||
force, err := cmd.Flags().GetBool("force")
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error parsing force flag")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
os.Exit(merge(path, force))
|
||||
os.Exit(merge(path))
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(mergeCmd)
|
||||
|
||||
mergeCmd.Flags().BoolP(
|
||||
"force", "f", false,
|
||||
"Force a re-merge even if .hint files exist",
|
||||
)
|
||||
}
|
||||
|
||||
func merge(path string, force bool) int {
|
||||
err := bitcask.Merge(path, force)
|
||||
func merge(path string) int {
|
||||
db, err := bitcask.Open(path)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error opening database")
|
||||
return 1
|
||||
}
|
||||
|
||||
if err = db.Merge(); err != nil {
|
||||
log.WithError(err).Error("error merging database")
|
||||
return 1
|
||||
}
|
||||
|
||||
@@ -13,11 +13,11 @@ import (
|
||||
"github.com/prologic/bitcask"
|
||||
)
|
||||
|
||||
var setCmd = &cobra.Command{
|
||||
Use: "set <key> [<value>]",
|
||||
Aliases: []string{"add"},
|
||||
Short: "Add/Set a new Key/Value pair",
|
||||
Long: `This adds or sets a new key/value pair.
|
||||
var putCmd = &cobra.Command{
|
||||
Use: "put <key> [<value>]",
|
||||
Aliases: []string{"add", "set", "store"},
|
||||
Short: "Adds a new Key/Value pair",
|
||||
Long: `This adds a new key/value pair or modifies an existing one.
|
||||
|
||||
If the value is not specified as an argument it is read from standard input.`,
|
||||
Args: cobra.MinimumNArgs(1),
|
||||
@@ -33,15 +33,15 @@ If the value is not specified as an argument it is read from standard input.`,
|
||||
value = os.Stdin
|
||||
}
|
||||
|
||||
os.Exit(set(path, key, value))
|
||||
os.Exit(put(path, key, value))
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(setCmd)
|
||||
RootCmd.AddCommand(putCmd)
|
||||
}
|
||||
|
||||
func set(path, key string, value io.Reader) int {
|
||||
func put(path, key string, value io.Reader) int {
|
||||
db, err := bitcask.Open(path)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error opening database")
|
||||
55
cmd/bitcask/stats.go
Normal file
55
cmd/bitcask/stats.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"github.com/prologic/bitcask"
|
||||
)
|
||||
|
||||
var statsCmd = &cobra.Command{
|
||||
Use: "stats",
|
||||
Aliases: []string{},
|
||||
Short: "Display statis about the Database",
|
||||
Long: `This displays statistics about the Database"`,
|
||||
Args: cobra.ExactArgs(0),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
path := viper.GetString("path")
|
||||
|
||||
os.Exit(stats(path))
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(statsCmd)
|
||||
}
|
||||
|
||||
func stats(path string) int {
|
||||
db, err := bitcask.Open(path)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error opening database")
|
||||
return 1
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
stats, err := db.Stats()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error getting stats")
|
||||
return 1
|
||||
}
|
||||
|
||||
data, err := json.MarshalIndent(stats, "", " ")
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error marshalling stats")
|
||||
return 1
|
||||
}
|
||||
|
||||
fmt.Println(string(data))
|
||||
|
||||
return 0
|
||||
}
|
||||
13
doc.go
Normal file
13
doc.go
Normal file
@@ -0,0 +1,13 @@
|
||||
// Package bitcask implements a high-performance key-value store based on a
|
||||
// WAL and LSM.
|
||||
//
|
||||
// By default, the client assumes a default configuration regarding maximum key size,
|
||||
// maximum value size, maximum datafile size, and memory pools to avoid allocations.
|
||||
// Refer to Constants section to know default values.
|
||||
//
|
||||
// For extra performance, configure the memory pool option properly. This option
|
||||
// requires to specify the maximum number of concurrent use of the package. Failing to
|
||||
// set a high-enough value would impact latency and throughput. Likewise, overestimating
|
||||
// would yield in an unnecessary big memory footprint.
|
||||
// The default configuration doesn't use a memory pool.
|
||||
package bitcask
|
||||
14
doc_test.go
Normal file
14
doc_test.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package bitcask
|
||||
|
||||
func Example() {
|
||||
_, _ = Open("path/to/db")
|
||||
}
|
||||
|
||||
func Example_withOptions() {
|
||||
opts := []Option{
|
||||
WithMaxKeySize(1024),
|
||||
WithMaxValueSize(4096),
|
||||
WithMemPool(10),
|
||||
}
|
||||
_, _ = Open("path/to/db", opts...)
|
||||
}
|
||||
3
go.mod
3
go.mod
@@ -1,14 +1,15 @@
|
||||
module github.com/prologic/bitcask
|
||||
|
||||
require (
|
||||
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d
|
||||
github.com/gofrs/flock v0.7.1
|
||||
github.com/gogo/protobuf v1.2.1
|
||||
github.com/golang/protobuf v1.3.2
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
|
||||
github.com/magiconair/properties v1.8.1 // indirect
|
||||
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
|
||||
github.com/pelletier/go-toml v1.4.0 // indirect
|
||||
github.com/pkg/errors v0.8.1
|
||||
github.com/prologic/trie v0.0.0-20190322091023-3972df81f9b5
|
||||
github.com/sirupsen/logrus v1.4.2
|
||||
github.com/spf13/afero v1.2.2 // indirect
|
||||
github.com/spf13/cobra v0.0.5
|
||||
|
||||
6
go.sum
6
go.sum
@@ -21,6 +21,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d h1:TocZO8frNoxkwqFPePHFldSw8vLu+gBrlvFZYWqxiF4=
|
||||
github.com/derekparker/trie v0.0.0-20190805173922-4e1a77fb815d/go.mod h1:D6ICZm05D9VN1n/8iOtBxLpXtoGp6HDFUJ1RNVieOSE=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
|
||||
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
|
||||
@@ -77,6 +79,8 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
|
||||
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
|
||||
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg=
|
||||
@@ -86,8 +90,6 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prologic/trie v0.0.0-20190322091023-3972df81f9b5 h1:H8dTZzU3aWNQnuRyiT45J9szv7EFakAhFzsFq27t3Uo=
|
||||
github.com/prologic/trie v0.0.0-20190322091023-3972df81f9b5/go.mod h1:LFuDmpHJGmciXd8Rl5YMhVlLMps9gz2GtYLzwxrFhzs=
|
||||
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
|
||||
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
|
||||
@@ -1,26 +1,31 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/oxtoacart/bpool"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/exp/mmap"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
pb "github.com/prologic/bitcask/internal/proto"
|
||||
"github.com/prologic/bitcask/internal/streampb"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultDatafileFilename = "%09d.data"
|
||||
prefixSize = 8
|
||||
)
|
||||
|
||||
var (
|
||||
ErrReadonly = errors.New("error: read only datafile")
|
||||
ErrReadError = errors.New("error: read error")
|
||||
|
||||
memPool *bpool.BufferPool
|
||||
mxMemPool sync.RWMutex
|
||||
)
|
||||
|
||||
type Datafile struct {
|
||||
@@ -136,7 +141,17 @@ func (df *Datafile) Read() (e pb.Entry, n int64, err error) {
|
||||
func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
|
||||
var n int
|
||||
|
||||
b := make([]byte, size)
|
||||
var b []byte
|
||||
if memPool == nil {
|
||||
b = make([]byte, size)
|
||||
} else {
|
||||
poolSlice := memPool.Get()
|
||||
if poolSlice.Cap() < int(size) {
|
||||
poolSlice.Grow(int(size) - poolSlice.Cap())
|
||||
}
|
||||
defer memPool.Put(poolSlice)
|
||||
b = poolSlice.Bytes()[:size]
|
||||
}
|
||||
|
||||
if df.w == nil {
|
||||
n, err = df.ra.ReadAt(b, index)
|
||||
@@ -151,9 +166,10 @@ func (df *Datafile) ReadAt(index, size int64) (e pb.Entry, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(b)
|
||||
dec := streampb.NewDecoder(buf)
|
||||
_, err = dec.Decode(&e)
|
||||
err = proto.Unmarshal(b[prefixSize:], &e)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -175,3 +191,15 @@ func (df *Datafile) Write(e pb.Entry) (int64, int64, error) {
|
||||
|
||||
return e.Offset, n, nil
|
||||
}
|
||||
|
||||
// ConfigureMemPool configurate the mempool accordingly
|
||||
func ConfigureMemPool(maxConcurrency *int) {
|
||||
mxMemPool.Lock()
|
||||
defer mxMemPool.Unlock()
|
||||
if maxConcurrency == nil {
|
||||
memPool = nil
|
||||
} else {
|
||||
memPool = bpool.NewBufferPool(*maxConcurrency)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/gob"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@@ -41,17 +42,15 @@ func (k *Keydir) Add(key string, fileid int, offset, size int64) Item {
|
||||
|
||||
func (k *Keydir) Get(key string) (Item, bool) {
|
||||
k.RLock()
|
||||
defer k.RUnlock()
|
||||
|
||||
item, ok := k.kv[key]
|
||||
k.RUnlock()
|
||||
return item, ok
|
||||
}
|
||||
|
||||
func (k *Keydir) Delete(key string) {
|
||||
k.Lock()
|
||||
defer k.Unlock()
|
||||
|
||||
delete(k.kv, key)
|
||||
k.Unlock()
|
||||
}
|
||||
|
||||
func (k *Keydir) Len() int {
|
||||
@@ -62,11 +61,11 @@ func (k *Keydir) Keys() chan string {
|
||||
ch := make(chan string)
|
||||
go func() {
|
||||
k.RLock()
|
||||
defer k.RUnlock()
|
||||
for key := range k.kv {
|
||||
ch <- key
|
||||
}
|
||||
close(ch)
|
||||
k.RUnlock()
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
@@ -81,6 +80,21 @@ func (k *Keydir) Bytes() ([]byte, error) {
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func (k *Keydir) Load(fn string) error {
|
||||
f, err := os.Open(fn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
dec := gob.NewDecoder(f)
|
||||
if err := dec.Decode(&k.kv); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *Keydir) Save(fn string) error {
|
||||
data, err := k.Bytes()
|
||||
if err != nil {
|
||||
|
||||
@@ -2,12 +2,32 @@ package internal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func Exists(path string) bool {
|
||||
_, err := os.Stat(path)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func DirSize(path string) (int64, error) {
|
||||
var size int64
|
||||
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
size += info.Size()
|
||||
}
|
||||
return err
|
||||
})
|
||||
return size, err
|
||||
}
|
||||
|
||||
func GetDatafiles(path string) ([]string, error) {
|
||||
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
|
||||
if err != nil {
|
||||
|
||||
62
options.go
62
options.go
@@ -1,5 +1,12 @@
|
||||
package bitcask
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultMaxDatafileSize is the default maximum datafile size in bytes
|
||||
DefaultMaxDatafileSize = 1 << 20 // 1MB
|
||||
@@ -11,6 +18,12 @@ const (
|
||||
DefaultMaxValueSize = 1 << 16 // 65KB
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrMaxConcurrencyLowerEqZero is the error returned for
|
||||
// maxConcurrency option not greater than zero
|
||||
ErrMaxConcurrencyLowerEqZero = errors.New("error: maxConcurrency must be greater than zero")
|
||||
)
|
||||
|
||||
// Option is a function that takes a config struct and modifies it
|
||||
type Option func(*config) error
|
||||
|
||||
@@ -18,6 +31,44 @@ type config struct {
|
||||
maxDatafileSize int
|
||||
maxKeySize int
|
||||
maxValueSize int
|
||||
maxConcurrency *int
|
||||
}
|
||||
|
||||
func (c *config) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(struct {
|
||||
MaxDatafileSize int `json:"max_datafile_size"`
|
||||
MaxKeySize int `json:"max_key_size"`
|
||||
MaxValueSize int `json:"max_value_size"`
|
||||
}{
|
||||
MaxDatafileSize: c.maxDatafileSize,
|
||||
MaxKeySize: c.maxKeySize,
|
||||
MaxValueSize: c.maxValueSize,
|
||||
})
|
||||
}
|
||||
|
||||
func getConfig(path string) (*config, error) {
|
||||
type Config struct {
|
||||
MaxDatafileSize int `json:"max_datafile_size"`
|
||||
MaxKeySize int `json:"max_key_size"`
|
||||
MaxValueSize int `json:"max_value_size"`
|
||||
}
|
||||
|
||||
var cfg Config
|
||||
|
||||
data, err := ioutil.ReadFile(filepath.Join(path, "config.json"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &config{
|
||||
maxDatafileSize: cfg.MaxDatafileSize,
|
||||
maxKeySize: cfg.MaxKeySize,
|
||||
maxValueSize: cfg.MaxValueSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newDefaultConfig() *config {
|
||||
@@ -51,3 +102,14 @@ func WithMaxValueSize(size int) Option {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithMemPool configures usage of a memory pool to avoid allocations
|
||||
func WithMemPool(maxConcurrency int) Option {
|
||||
return func(cfg *config) error {
|
||||
if maxConcurrency <= 0 {
|
||||
return ErrMaxConcurrencyLowerEqZero
|
||||
}
|
||||
cfg.maxConcurrency = &maxConcurrency
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user